use crate::*;
use tokio_util::codec::Decoder;
pub struct TsPacket {
pub header: TransportStreamHeader,
pub adaptation_field: Option<AdaptationField>,
pub payload: Bytes,
}
impl std::fmt::Debug for TsPacket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TsPacket")
.field("header", &self.header)
.field("adaptation_field", &self.adaptation_field)
.field("payload_len", &self.payload.len())
.finish()
}
}
impl TsPacket {
pub const PACKET_SIZE: usize = 188;
pub fn from_bytes(data: Bytes) -> Option<Self> {
if data.len() < Self::PACKET_SIZE {
return None;
}
let header =
TransportStreamHeader::from_bits(u32::from_be_bytes(data.get(0..4)?.try_into().ok()?));
if header.sync_byte() != 0x47 {
return None;
}
let mut adaption_field = None;
let mut index = 4;
if header.adaptation_field() {
let adaption_field_length = *data.get(index)? as usize;
index += 1;
if index + adaption_field_length > data.len() {
return None;
}
if adaption_field_length > 0 {
let field_data = data.slice(index..index + adaption_field_length);
index += adaption_field_length;
adaption_field = Some(AdaptationField::from_bytes(field_data)?);
}
}
Some(Self {
header,
adaptation_field: adaption_field,
payload: data.slice(index..Self::PACKET_SIZE),
})
}
}
pub struct TsPacketDecoder {
pub stream_position: u64,
}
impl TsPacketDecoder {
pub fn new(stream_position: u64) -> Self {
Self { stream_position }
}
}
impl Decoder for TsPacketDecoder {
type Item = (u64, TsPacket);
type Error = TsPacketError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
if src.len() < TsPacket::PACKET_SIZE {
return Ok(None);
}
loop {
while !src.is_empty() && src[0] != 0x47 {
self.stream_position += 1;
src.advance(1);
}
if src.len() < TsPacket::PACKET_SIZE {
return Ok(None);
}
if src.len() > TsPacket::PACKET_SIZE && src[TsPacket::PACKET_SIZE] != 0x47 {
self.stream_position += 1;
src.advance(1);
continue;
}
break;
}
let position = self.stream_position;
self.stream_position += TsPacket::PACKET_SIZE as u64;
let packet = TsPacket::from_bytes(src.split_to(TsPacket::PACKET_SIZE).freeze())
.ok_or(TsPacketError::InvalidPacket)?;
Ok(Some((position, packet)))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_packet(pid: u16, cc: u8, fill: u8) -> [u8; TsPacket::PACKET_SIZE] {
let mut buf = [fill; TsPacket::PACKET_SIZE];
buf[0] = 0x47;
buf[1] = (pid >> 8) as u8 & 0x1F; buf[2] = pid as u8; buf[3] = 0x10 | (cc & 0x0F);
buf
}
fn make_packet_with_af(pid: u16, af_len: u8, af_flags: u8) -> [u8; TsPacket::PACKET_SIZE] {
let mut buf = [0xFF; TsPacket::PACKET_SIZE];
buf[0] = 0x47;
buf[1] = (pid >> 8) as u8 & 0x1F;
buf[2] = pid as u8;
buf[3] = 0x30;
buf[4] = af_len;
if af_len > 0 {
buf[5] = af_flags;
}
buf
}
#[test]
fn test_from_bytes_valid_payload_only() {
let pkt = make_packet(0x100, 3, 0xAB);
let ts = TsPacket::from_bytes(Bytes::copy_from_slice(&pkt)).unwrap();
assert_eq!(ts.header.pid(), 0x100);
assert_eq!(ts.header.continuity_counter(), 3);
assert!(ts.header.payload());
assert!(!ts.header.adaptation_field());
assert!(ts.adaptation_field.is_none());
assert_eq!(ts.payload.len(), 184); }
#[test]
fn test_from_bytes_with_adaptation_field() {
let pkt = make_packet_with_af(0x01, 7, 0x10); let ts = TsPacket::from_bytes(Bytes::copy_from_slice(&pkt)).unwrap();
assert!(ts.header.adaptation_field());
assert!(ts.adaptation_field.is_some());
let af = ts.adaptation_field.unwrap();
assert!(af.flags.pcr_flag());
}
#[test]
fn test_from_bytes_bad_sync_returns_none() {
let mut pkt = make_packet(0x00, 0, 0);
pkt[0] = 0x00; assert!(TsPacket::from_bytes(Bytes::copy_from_slice(&pkt)).is_none());
}
#[test]
fn test_from_bytes_too_short_returns_none() {
assert!(TsPacket::from_bytes(Bytes::from_static(&[0x47, 0x00, 0x00, 0x10])).is_none());
}
#[test]
fn test_decoder_not_enough_data() {
let mut decoder = TsPacketDecoder::new(0);
let mut buf = BytesMut::from(&[0x47u8; 100][..]);
let result = decoder.decode(&mut buf).unwrap();
assert!(result.is_none());
assert_eq!(buf.len(), 100);
}
#[test]
fn test_decoder_exact_packet() {
let mut decoder = TsPacketDecoder::new(0);
let pkt = make_packet(0x20, 5, 0x00);
let mut buf = BytesMut::from(&pkt[..]);
let result = decoder.decode(&mut buf).unwrap();
assert!(result.is_some());
let (pos, ts) = result.unwrap();
assert_eq!(pos, 0);
assert_eq!(ts.header.pid(), 0x20);
assert_eq!(ts.header.continuity_counter(), 5);
assert_eq!(buf.len(), 0);
}
#[test]
fn test_decoder_skips_garbage_before_sync() {
let mut decoder = TsPacketDecoder::new(0);
let pkt1 = make_packet(0x30, 7, 0xCC);
let pkt2 = make_packet(0x31, 0, 0x00);
let mut buf = BytesMut::new();
buf.extend_from_slice(&[0x00, 0xFF, 0xAA]); buf.extend_from_slice(&pkt1);
buf.extend_from_slice(&pkt2); let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
assert_eq!(pos, 3); assert_eq!(ts.header.pid(), 0x30);
assert_eq!(buf.len(), 188); }
#[test]
fn test_decoder_two_packets_sequential() {
let mut decoder = TsPacketDecoder::new(0);
let pkt1 = make_packet(0x100, 0, 0x11);
let pkt2 = make_packet(0x200, 1, 0x22);
let mut buf = BytesMut::new();
buf.extend_from_slice(&pkt1);
buf.extend_from_slice(&pkt2);
let (pos1, ts1) = decoder.decode(&mut buf).unwrap().unwrap();
assert_eq!(pos1, 0);
assert_eq!(ts1.header.pid(), 0x100);
assert_eq!(buf.len(), 188);
let (pos2, ts2) = decoder.decode(&mut buf).unwrap().unwrap();
assert_eq!(pos2, 188);
assert_eq!(ts2.header.pid(), 0x200);
assert_eq!(buf.len(), 0);
}
#[test]
fn test_decoder_partial_then_complete() {
let mut decoder = TsPacketDecoder::new(0);
let pkt = make_packet(0x42, 2, 0xDD);
let mut buf = BytesMut::new();
buf.extend_from_slice(&pkt[..100]);
assert!(decoder.decode(&mut buf).unwrap().is_none());
buf.extend_from_slice(&pkt[100..]);
let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
assert_eq!(pos, 0);
assert_eq!(ts.header.pid(), 0x42);
assert_eq!(buf.len(), 0);
}
#[test]
fn test_decoder_empty_buffer() {
let mut decoder = TsPacketDecoder::new(0);
let mut buf = BytesMut::new();
assert!(decoder.decode(&mut buf).unwrap().is_none());
}
#[test]
fn test_decoder_with_initial_stream_position() {
let mut decoder = TsPacketDecoder::new(1000);
let pkt1 = make_packet(0x50, 0, 0x00);
let pkt2 = make_packet(0x51, 1, 0x00);
let mut buf = BytesMut::new();
buf.extend_from_slice(&pkt1);
buf.extend_from_slice(&pkt2);
let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
assert_eq!(pos, 1000);
assert_eq!(ts.header.pid(), 0x50);
assert_eq!(decoder.stream_position, 1188);
}
#[test]
fn test_decoder_false_sync_byte_skipped() {
let mut decoder = TsPacketDecoder::new(0);
let pkt = make_packet(0x60, 0, 0x00);
let mut buf = BytesMut::new();
buf.extend_from_slice(&[0x47]); buf.extend_from_slice(&[0x00; 187]); buf.extend_from_slice(&[0x00]); buf.extend_from_slice(&[0x00; 187]); let pkt2 = make_packet(0x61, 1, 0x00);
buf.extend_from_slice(&pkt);
buf.extend_from_slice(&pkt2);
let (pos, ts) = decoder.decode(&mut buf).unwrap().unwrap();
assert_eq!(pos, 376);
assert_eq!(ts.header.pid(), 0x60);
}
}