use crate::error::{Error, Result};
pub const TS_PACKET_SIZE: usize = 188;
pub const TS_SYNC_BYTE: u8 = 0x47;
const MAX_SECTION_SIZE: usize = 4098;
const TEI_MASK: u8 = 0x80;
const PUSI_MASK: u8 = 0x40;
pub const PID_MASK_HI: u8 = 0x1F;
pub const SCRAMBLING_MASK: u8 = 0xC0;
pub const ADAPTATION_FLAG: u8 = 0x20;
pub const PAYLOAD_FLAG: u8 = 0x10;
pub const CC_MASK: u8 = 0x0F;
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
pub struct TsHeader {
pub tei: bool,
pub pusi: bool,
pub pid: u16,
pub scrambling: u8,
pub has_adaptation: bool,
pub has_payload: bool,
pub continuity_counter: u8,
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
pub struct TsPacket<'a> {
pub header: TsHeader,
pub payload: Option<&'a [u8]>,
#[cfg_attr(feature = "serde", serde(skip))]
pub raw: &'a [u8; TS_PACKET_SIZE],
}
impl TsHeader {
pub fn parse(raw4: &[u8]) -> Option<Self> {
if raw4.len() < 4 {
return None;
}
let b1 = raw4[1];
let b2 = raw4[2];
let b3 = raw4[3];
let tei = (b1 & TEI_MASK) != 0;
let pusi = (b1 & PUSI_MASK) != 0;
let pid = (((b1 & PID_MASK_HI) as u16) << 8) | (b2 as u16);
let scrambling = (b3 & SCRAMBLING_MASK) >> 6;
let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
let has_payload = (b3 & PAYLOAD_FLAG) != 0;
let continuity_counter = b3 & CC_MASK;
Some(Self {
tei,
pusi,
pid,
scrambling,
has_adaptation,
has_payload,
continuity_counter,
})
}
pub fn serialize_into(&self, buf: &mut [u8]) {
assert!(
buf.len() >= 4,
"buffer must have at least 4 bytes for TS header"
);
buf[0] = TS_SYNC_BYTE;
buf[1] = 0;
if self.tei {
buf[1] |= TEI_MASK;
}
if self.pusi {
buf[1] |= PUSI_MASK;
}
buf[1] |= ((self.pid >> 8) as u8) & PID_MASK_HI;
buf[2] = (self.pid & 0xFF) as u8;
buf[3] = (self.scrambling << 6) & SCRAMBLING_MASK;
if self.has_adaptation {
buf[3] |= ADAPTATION_FLAG;
}
if self.has_payload {
buf[3] |= PAYLOAD_FLAG;
}
buf[3] |= self.continuity_counter & CC_MASK;
}
}
impl<'a> TsPacket<'a> {
pub fn parse(buf: &'a [u8]) -> Result<Self> {
if buf.len() < TS_PACKET_SIZE {
return Err(Error::BufferTooShort {
need: TS_PACKET_SIZE,
have: buf.len(),
what: "TsPacket::parse",
});
}
if buf[0] != TS_SYNC_BYTE {
return Err(Error::InvalidSyncByte { found: buf[0] });
}
let raw: &[u8; TS_PACKET_SIZE] =
buf[..TS_PACKET_SIZE]
.try_into()
.map_err(|_| Error::BufferTooShort {
need: TS_PACKET_SIZE,
have: buf.len(),
what: "TsPacket::parse (array conversion)",
})?;
let header = TsHeader::parse(&raw[..4])
.expect("raw is 188 bytes so first 4 bytes are always present");
let mut cursor = 4usize;
let mut payload = None;
if header.has_adaptation && cursor < TS_PACKET_SIZE {
let af_len = raw[cursor] as usize;
cursor += 1 + af_len;
}
if header.has_payload && cursor < TS_PACKET_SIZE {
payload = Some(&raw[cursor..]);
}
Ok(TsPacket {
header,
payload,
raw,
})
}
}
#[derive(Default)]
pub struct SectionReassembler {
buf: bytes::BytesMut,
expected: usize,
ready: std::collections::VecDeque<bytes::Bytes>,
}
impl SectionReassembler {
pub fn feed(&mut self, payload: &[u8], pusi: bool) {
if pusi {
if payload.is_empty() {
self.buf.clear();
self.expected = 0;
return;
}
let pointer = payload[0] as usize;
if !self.buf.is_empty() && pointer > 0 {
let avail = payload.len() - 1;
let tail_len = pointer.min(avail);
if self.buf.len() + tail_len > MAX_SECTION_SIZE {
self.buf.clear();
self.expected = 0;
} else {
self.buf.extend_from_slice(&payload[1..1 + tail_len]);
self.drain_complete_sections();
}
}
self.buf.clear();
self.expected = 0;
let start = 1 + pointer;
if start >= payload.len() {
return;
}
let new_data = &payload[start..];
if new_data.len() > MAX_SECTION_SIZE {
return;
}
self.buf.extend_from_slice(new_data);
} else {
if self.buf.is_empty() {
return;
}
if self.buf.len() + payload.len() > MAX_SECTION_SIZE {
self.buf.clear();
self.expected = 0;
return;
}
self.buf.extend_from_slice(payload);
}
self.drain_complete_sections();
}
fn drain_complete_sections(&mut self) {
loop {
if self.buf.len() < 3 {
self.expected = 0;
break;
}
if self.buf[0] == 0xFF {
self.buf.clear();
self.expected = 0;
break;
}
let exp = 3 + (((self.buf[1] & 0x0F) as usize) << 8 | self.buf[2] as usize);
if self.buf.len() >= exp {
let section = self.buf.split_to(exp).freeze();
self.ready.push_back(section);
self.expected = 0;
} else {
self.expected = exp;
break;
}
}
}
pub fn pop_section(&mut self) -> Option<bytes::Bytes> {
self.ready.pop_front()
}
pub fn len(&self) -> usize {
self.buf.len()
}
pub fn is_empty(&self) -> bool {
self.buf.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_packet(b1: u8, b2: u8, b3: u8, payload_data: &[u8]) -> [u8; TS_PACKET_SIZE] {
let mut pkt = [0u8; TS_PACKET_SIZE];
pkt[0] = TS_SYNC_BYTE;
pkt[1] = b1;
pkt[2] = b2;
pkt[3] = b3;
let payload_start = 4;
let end = (payload_start + payload_data.len()).min(TS_PACKET_SIZE);
let len = (end - payload_start).min(payload_data.len());
pkt[payload_start..payload_start + len].copy_from_slice(&payload_data[..len]);
pkt
}
#[test]
fn parse_rejects_non_0x47_sync_byte() {
let mut pkt = [0u8; TS_PACKET_SIZE];
pkt[0] = 0x46; let err = TsPacket::parse(&pkt).unwrap_err();
match err {
Error::InvalidSyncByte { found } => assert_eq!(found, 0x46),
other => panic!("expected InvalidSyncByte, got {other:?}"),
}
}
#[test]
fn parse_extracts_pid_and_continuity_counter() {
let pkt = make_packet(0x12, 0x34, 0x05, &[]);
let pkt = TsPacket::parse(&pkt).unwrap();
assert_eq!(pkt.header.pid, 0x1234);
assert_eq!(pkt.header.continuity_counter, 5);
}
#[test]
fn payload_unit_start_indicator_flag_extracted() {
let pkt1 = make_packet(0x40, 0x00, 0x00, &[]);
let pkt1 = TsPacket::parse(&pkt1).unwrap();
assert!(pkt1.header.pusi);
let pkt2 = make_packet(0x00, 0x00, 0x00, &[]);
let pkt2 = TsPacket::parse(&pkt2).unwrap();
assert!(!pkt2.header.pusi);
}
fn build_pusi_payload(pointer_field: u8, previous_tail: &[u8], section: &[u8]) -> Vec<u8> {
assert_eq!(pointer_field as usize, previous_tail.len());
let mut v = Vec::with_capacity(1 + previous_tail.len() + section.len());
v.push(pointer_field);
v.extend_from_slice(previous_tail);
v.extend_from_slice(section);
v
}
fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
let section_length = body_after_length.len() as u16;
let mut v = Vec::with_capacity(3 + section_length as usize);
v.push(table_id);
v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
v.push((section_length & 0xFF) as u8);
v.extend_from_slice(body_after_length);
v
}
#[test]
fn reassembler_accumulates_multi_packet_section() {
let body = vec![0xAAu8; 197];
let section = build_section(0x02, &body);
assert_eq!(section.len(), 200);
let first_chunk = 100;
let payload1 = build_pusi_payload(0, &[], §ion[..first_chunk]);
let payload2 = section[first_chunk..].to_vec();
let mut reasm = SectionReassembler::default();
reasm.feed(&payload1, true);
reasm.feed(&payload2, false);
let out = reasm.pop_section().expect("section should be ready");
assert_eq!(out.len(), 200);
assert_eq!(out.as_ref(), §ion[..]);
}
#[test]
fn reassembler_yields_complete_section_once_length_satisfied() {
let section = build_section(0x42, &[0xAA]);
assert_eq!(section.len(), 4);
let payload = build_pusi_payload(0, &[], §ion);
let mut reasm = SectionReassembler::default();
reasm.feed(&payload, true);
let out = reasm
.pop_section()
.expect("single-packet section should pop");
assert_eq!(out.as_ref(), §ion[..]);
}
#[test]
fn reassembler_extracts_all_concatenated_sections_in_one_payload() {
let s1 = build_section(0x42, &[0x11, 0x22]); let s2 = build_section(0x46, &[0x33]); let s3 = build_section(0x4A, &[0x44, 0x55, 0x66]);
let mut concat = Vec::new();
concat.extend_from_slice(&s1);
concat.extend_from_slice(&s2);
concat.extend_from_slice(&s3);
let payload = build_pusi_payload(0, &[], &concat);
let mut reasm = SectionReassembler::default();
reasm.feed(&payload, true);
let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
assert_eq!(got.len(), 3, "all three concatenated sections must pop");
assert_eq!(got[0].as_ref(), &s1[..]);
assert_eq!(got[1].as_ref(), &s2[..]);
assert_eq!(got[2].as_ref(), &s3[..]);
}
#[test]
fn reassembler_stops_at_stuffing_after_concatenated_sections() {
let s1 = build_section(0x42, &[0xAA]); let s2 = build_section(0x46, &[0xBB, 0xCC]); let mut concat = Vec::new();
concat.extend_from_slice(&s1);
concat.extend_from_slice(&s2);
concat.extend_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); let payload = build_pusi_payload(0, &[], &concat);
let mut reasm = SectionReassembler::default();
reasm.feed(&payload, true);
let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
assert_eq!(got.len(), 2);
assert_eq!(got[0].as_ref(), &s1[..]);
assert_eq!(got[1].as_ref(), &s2[..]);
assert!(
reasm.is_empty(),
"stuffing tail must be discarded, not buffered"
);
}
#[test]
fn reassembler_concatenated_then_spanning_tail() {
let s1 = build_section(0x42, &[0x01, 0x02]); let s2 = build_section(0x46, &[0x09u8; 60]); let split = 30;
let mut head = Vec::new();
head.extend_from_slice(&s1);
head.extend_from_slice(&s2[..split]);
let payload1 = build_pusi_payload(0, &[], &head);
let payload2 = s2[split..].to_vec();
let mut reasm = SectionReassembler::default();
reasm.feed(&payload1, true);
let first = reasm.pop_section().expect("first section pops at once");
assert_eq!(first.as_ref(), &s1[..]);
assert!(reasm.pop_section().is_none(), "second is still partial");
reasm.feed(&payload2, false);
let second = reasm.pop_section().expect("second pops after continuation");
assert_eq!(second.as_ref(), &s2[..]);
}
#[test]
fn reassembler_completes_section_spanning_into_pusi_packet() {
let spanning = build_section(0x42, &[0x5Au8; 62]); let head = 41;
let tail = &spanning[head..]; assert_eq!(tail.len(), 24);
let next = build_section(0x46, &[0x77, 0x88]);
let payload_a = build_pusi_payload(0, &[], &spanning[..head]);
let payload_b = build_pusi_payload(24, tail, &next);
let mut reasm = SectionReassembler::default();
reasm.feed(&payload_a, true);
assert!(reasm.pop_section().is_none(), "head alone is incomplete");
reasm.feed(&payload_b, true);
let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
assert_eq!(got.len(), 2, "spanning section + new section must both pop");
assert_eq!(
got[0].as_ref(),
&spanning[..],
"spanning section completed from B's pointer tail"
);
assert_eq!(got[1].as_ref(), &next[..]);
}
#[test]
fn reassembler_pusi_pointer_spans_whole_payload() {
let spanning = build_section(0x42, &[0x33u8; 40]); let head = 20;
let payload_a = build_pusi_payload(0, &[], &spanning[..head]);
let tail = &spanning[head..];
let mut reasm = SectionReassembler::default();
reasm.feed(&payload_a, true);
reasm.feed(&payload_b_pointer_only(tail), true);
let out = reasm.pop_section().expect("spanning section completes");
assert_eq!(out.as_ref(), &spanning[..]);
assert!(reasm.pop_section().is_none());
}
fn payload_b_pointer_only(tail: &[u8]) -> Vec<u8> {
let mut v = Vec::with_capacity(1 + tail.len());
v.push(tail.len() as u8);
v.extend_from_slice(tail);
v
}
#[test]
fn reassembler_discards_on_buffer_overflow() {
let mut section = Vec::with_capacity(3 + 4095);
section.push(0x00); section.push(0xB0 | ((4095u16 >> 8) as u8 & 0x0F));
section.push(0xFF);
section.extend_from_slice(&[0u8; 160]);
let payload1 = build_pusi_payload(0, &[], §ion);
let mut reasm = SectionReassembler::default();
reasm.feed(&payload1, true);
assert!(reasm.pop_section().is_none());
let filler = vec![0u8; 180];
for _ in 0..(MAX_SECTION_SIZE / 180 + 1) {
reasm.feed(&filler, false);
}
assert!(
reasm.pop_section().is_none(),
"no section should pop after overflow reset"
);
let valid_section = build_section(0x00, &[0xAA]);
let payload2 = build_pusi_payload(0, &[], &valid_section);
reasm.feed(&payload2, true);
let out = reasm
.pop_section()
.expect("fresh section should pop after reset");
assert_eq!(out.as_ref(), &valid_section[..]);
}
#[test]
fn reassembler_handles_pusi_with_nonzero_pointer_field() {
let prior_tail = vec![0x11, 0x22, 0x33];
let new_section = build_section(0x02, &[0xBB]);
assert_eq!(new_section.len(), 4);
let payload = build_pusi_payload(3, &prior_tail, &new_section);
let mut reasm = SectionReassembler::default();
reasm.feed(&payload, true);
let out = reasm
.pop_section()
.expect("section after pointer_field skip should pop");
assert_eq!(out.as_ref(), &new_section[..]);
}
#[test]
fn reassembler_ignores_continuation_before_pusi() {
let pkt = make_packet(0x00, 0x00, PAYLOAD_FLAG, &[0xAA, 0xBB, 0xCC]);
let mut reasm = SectionReassembler::default();
reasm.feed(&pkt[4..], false);
assert!(
reasm.pop_section().is_none(),
"no section should appear without prior PUSI"
);
assert!(
reasm.pop_section().is_none(),
"second pop should also be none"
);
}
#[test]
fn reassembler_empty_pusi_payload_does_not_panic() {
let mut reasm = SectionReassembler::default();
reasm.feed(&[], true);
assert!(reasm.pop_section().is_none());
let mut payload = vec![0x00u8, 0x72, 0x70, 0x01, 0x00];
payload.resize(5, 0);
reasm.feed(&payload, true);
assert!(reasm.pop_section().is_some());
}
#[test]
fn reassembler_accepts_maximal_private_section() {
let mut section = vec![0x80u8, 0x7F, 0xFF]; section.resize(3 + 0xFFF, 0xAB);
let mut reasm = SectionReassembler::default();
let mut first = vec![0x00];
first.extend_from_slice(§ion[..183]);
reasm.feed(&first, true);
for chunk in section[183..].chunks(184) {
reasm.feed(chunk, false);
}
let out = reasm.pop_section().expect("4098-byte section should pop");
assert_eq!(out.len(), 4098);
assert_eq!(out.as_ref(), §ion[..]);
}
}