use libmpegts::{
pes::{
EsFrame,
PesHeader,
PesPacketizer,
PtsDts,
STREAM_ID_AUDIO,
STREAM_ID_VIDEO,
Timestamp,
},
ts::{
PACKET_SIZE,
SYNC_BYTE,
TsPacketRef,
},
};
#[test]
fn test_pes_header_write_no_timestamp() {
let header = PesHeader::new(STREAM_ID_VIDEO);
let mut buf = [0u8; 32];
let written = header.write(&mut buf);
assert_eq!(written, 9);
assert_eq!(buf[0], 0x00);
assert_eq!(buf[1], 0x00);
assert_eq!(buf[2], 0x01);
assert_eq!(buf[3], STREAM_ID_VIDEO);
assert_eq!(buf[4], 0x00);
assert_eq!(buf[5], 0x00);
assert_eq!(buf[6] & 0xC0, 0x80);
assert_eq!(buf[7] >> 6, 0b00);
assert_eq!(buf[8], 0);
}
#[test]
fn test_pes_header_write_pts_only() {
let pts = 90000u64; let header = PesHeader::new(STREAM_ID_AUDIO).with_pts_dts(PtsDts::new(pts));
let mut buf = [0u8; 32];
let written = header.write(&mut buf);
assert_eq!(written, 14);
assert_eq!(buf[7] >> 6, 0b10);
assert_eq!(buf[8], 5);
let decoded_pts = decode_timestamp(&buf[9 .. 14]);
assert_eq!(decoded_pts, pts);
}
#[test]
fn test_pes_header_write_pts_dts() {
let pts = 180000u64; let dts = 90000u64; let header = PesHeader::new(STREAM_ID_VIDEO).with_pts_dts(PtsDts::new(pts).with_dts(dts));
let mut buf = [0u8; 32];
let written = header.write(&mut buf);
assert_eq!(written, 19);
assert_eq!(buf[7] >> 6, 0b11);
assert_eq!(buf[8], 10);
let decoded_pts = decode_timestamp(&buf[9 .. 14]);
assert_eq!(decoded_pts, pts);
let decoded_dts = decode_timestamp(&buf[14 .. 19]);
assert_eq!(decoded_dts, dts);
}
#[test]
fn test_pes_header_data_alignment() {
let header = PesHeader::new(STREAM_ID_VIDEO).with_data_alignment(true);
let mut buf = [0u8; 32];
header.write(&mut buf);
assert_eq!(buf[6] & 0x04, 0x04);
}
#[test]
fn test_pes_header_pts_max_value() {
let pts = Timestamp::MAX;
let header = PesHeader::new(STREAM_ID_VIDEO).with_pts_dts(PtsDts::new(pts));
let mut buf = [0u8; 32];
header.write(&mut buf);
let decoded_pts = decode_timestamp(&buf[9 .. 14]);
assert_eq!(decoded_pts, pts);
}
#[test]
fn test_packetizer_single_packet() {
let mut packetizer = PesPacketizer::new(0x100);
let mut packet = [0u8; PACKET_SIZE];
let header = PesHeader::new(STREAM_ID_VIDEO).with_pts_dts(PtsDts::new(90000));
let payload = vec![0xAB; 100];
let frame = EsFrame {
header: header.clone(),
payload: payload.clone(),
rai: false,
};
packetizer.set_frame(frame);
assert!(packetizer.next(&mut packet));
assert_eq!(packet[0], SYNC_BYTE);
assert_eq!((packet[1] & 0x40), 0x40, "PUSI should be set");
let ts_ref = TsPacketRef::from(&packet);
assert_eq!(ts_ref.pid(), 0x100);
let ts_header_size = 4 + 2;
let pes_header_size = 9 + 5;
let stuffing_size = PACKET_SIZE - (ts_header_size + pes_header_size + payload.len());
let payload_start = ts_header_size + stuffing_size + pes_header_size;
let has_af = (packet[3] & 0x20) != 0;
assert!(has_af, "Adaptation field should be present for stuffing");
assert_eq!(packet[4] as usize, stuffing_size + 1);
for &b in &packet[ts_header_size .. ts_header_size + stuffing_size] {
assert_eq!(b, 0xFF, "Stuffing bytes should be 0xFF");
}
let has_payload = (packet[3] & 0x10) != 0;
assert!(has_payload, "Payload flag should be set");
assert_eq!(&packet[payload_start ..], payload.as_slice());
assert!(!packetizer.next(&mut packet));
}
#[test]
fn test_packetizer_single_packet_with_rai() {
let mut packetizer = PesPacketizer::new(0x100);
let mut packet = [0u8; PACKET_SIZE];
let header = PesHeader::new(STREAM_ID_VIDEO).with_pts_dts(PtsDts::new(90000));
let frame = EsFrame {
header: header.clone(),
payload: vec![0xAB; 168],
rai: true,
};
packetizer.set_frame(frame);
assert!(packetizer.next(&mut packet));
assert!(!packetizer.next(&mut packet), "Should fit in one packet");
let has_af = (packet[3] & 0x20) != 0;
assert!(has_af, "AF should be present for RAI");
assert_eq!(packet[4], 1, "AF length should be 1 (flags byte only)");
assert_eq!(packet[5] & 0x40, 0x40, "RAI should be set");
let frame = EsFrame {
header,
payload: vec![0xAB; 100],
rai: true,
};
packetizer.set_frame(frame);
assert!(packetizer.next(&mut packet));
assert!(!packetizer.next(&mut packet), "Should fit in one packet");
let has_af = (packet[3] & 0x20) != 0;
assert!(has_af, "AF should be present for RAI with stuffing");
assert_eq!(
packet[4], 69,
"AF length should be 69 (1 for RAI flags + 68 stuffing)"
);
assert_eq!(packet[5] & 0x40, 0x40, "RAI should be set with stuffing");
}
#[test]
fn test_packetizer_multiple_packets() {
let mut packetizer = PesPacketizer::new(0x200);
let header = PesHeader::new(STREAM_ID_VIDEO).with_pts_dts(PtsDts::new(90000));
let payload = vec![0xCD; 500];
let frame = EsFrame {
header,
payload,
rai: false,
};
packetizer.set_frame(frame);
let mut packet = [0u8; PACKET_SIZE];
assert!(packetizer.next(&mut packet));
let ts_ref = TsPacketRef::from(&packet);
assert!(ts_ref.is_payload_start(), "First packet should have PUSI");
assert_eq!(ts_ref.cc(), 0, "CC should start at 0");
assert!(packetizer.next(&mut packet));
let ts_ref = TsPacketRef::from(&packet);
assert!(!ts_ref.is_payload_start(), "No PUSI in continuation packet");
assert_eq!(ts_ref.cc(), 1, "CC should increment to 1");
assert!(packetizer.next(&mut packet));
let ts_ref = TsPacketRef::from(&packet);
assert!(!ts_ref.is_payload_start(), "No PUSI in last packet");
assert_eq!(ts_ref.cc(), 2, "CC should increment to 2");
let has_af = (packet[3] & 0x20) != 0;
assert!(has_af, "AF should be present in last packet for stuffing");
let af_length = packet[4] as usize;
assert_eq!(af_length, 37, "AF length should be 37");
assert_eq!(packet[5], 0, "No flags should be set in AF");
let stuffing_size = af_length - 1;
for &b in &packet[6 .. 6 + stuffing_size] {
assert_eq!(b, 0xFF, "Stuffing bytes should be 0xFF");
}
assert!(!packetizer.next(&mut packet));
}
#[test]
fn test_packetizer_multiple_packets_with_rai() {
let mut packetizer = PesPacketizer::new(0x200);
let header = PesHeader::new(STREAM_ID_VIDEO).with_pts_dts(PtsDts::new(90000));
let payload = vec![0xCD; 500];
let frame = EsFrame {
header,
payload,
rai: true,
};
packetizer.set_frame(frame);
let mut packet = [0u8; PACKET_SIZE];
assert!(packetizer.next(&mut packet));
assert!(packet[3] & 0x20 != 0, "First packet should have AF");
assert_eq!(packet[4], 1, "AF length should be 1 for RAI");
assert_eq!(packet[5], 0x40, "First packet should have RAI");
assert!(packetizer.next(&mut packet));
assert!(packet[3] & 0x20 == 0, "Second packet should not have AF");
assert!(packetizer.next(&mut packet));
assert!(
packet[3] & 0x20 != 0,
"Third packet should have AF for stuffing"
);
assert_eq!(packet[4], 35, "Third packet AF length should be 35");
assert_eq!(packet[5], 0, "Third packet should not have flags");
assert!(!packetizer.next(&mut packet));
}
#[test]
fn test_packetizer_cc_wrap() {
let mut packetizer = PesPacketizer::new(0x100);
for i in 0 .. 20 {
let header = PesHeader::new(STREAM_ID_VIDEO);
let payload = vec![0u8; 10];
let frame = EsFrame {
header,
payload,
rai: false,
};
packetizer.set_frame(frame);
let mut packet = [0u8; PACKET_SIZE];
assert!(packetizer.next(&mut packet));
assert!(!packetizer.next(&mut packet));
let ts_ref = TsPacketRef::from(&packet);
assert_eq!(
ts_ref.cc(),
(i % 16) as u8,
"CC should increment and wrap at 16"
);
}
}
#[test]
fn test_packetizer_cc_continuous_across_frames() {
let mut packetizer = PesPacketizer::new(0x100);
let mut prev_cc: Option<u8> = None;
for _ in 0 .. 5 {
let header = PesHeader::new(STREAM_ID_VIDEO).with_pts_dts(PtsDts::new(90000));
let payload = vec![0xAA; 500];
let frame = EsFrame {
header,
payload,
rai: false,
};
packetizer.set_frame(frame);
let mut packet = [0u8; PACKET_SIZE];
while packetizer.next(&mut packet) {
let ts_ref = TsPacketRef::from(&packet);
let cc = ts_ref.cc();
if let Some(prev) = prev_cc {
let expected = (prev + 1) & 0x0F;
assert_eq!(cc, expected, "CC should be continuous across frames");
}
prev_cc = Some(cc);
}
}
}
#[test]
fn test_packetizer_pid() {
let mut packetizer = PesPacketizer::new(8190);
let header = PesHeader::new(STREAM_ID_VIDEO);
let payload = vec![0u8; 10];
let frame = EsFrame {
header,
payload,
rai: false,
};
packetizer.set_frame(frame);
let mut packet = [0u8; PACKET_SIZE];
packetizer.next(&mut packet);
let ts_ref = TsPacketRef::from(&packet);
assert_eq!(ts_ref.pid(), 8190);
}
#[test]
fn test_timestamp_wrapping() {
let ts = Timestamp::new(1000);
assert_eq!(ts.wrapping_add(500.into()).value(), 1500);
assert_eq!(ts.wrapping_sub(400.into()).value(), 600);
let ts = Timestamp::new(Timestamp::MAX);
assert_eq!(ts.wrapping_add(1.into()).value(), 0);
assert_eq!(ts.wrapping_add(100.into()).value(), 99);
let ts = Timestamp::new(0);
assert_eq!(ts.wrapping_sub(1.into()).value(), Timestamp::MAX);
assert_eq!(ts.wrapping_sub(100.into()).value(), Timestamp::MAX - 99);
}
fn decode_timestamp(buf: &[u8]) -> u64 {
let b0 = ((buf[0] & 0x0E) >> 1) as u64;
let b1 = buf[1] as u64;
let b2 = ((buf[2] & 0xFE) >> 1) as u64;
let b3 = buf[3] as u64;
let b4 = ((buf[4] & 0xFE) >> 1) as u64;
(b0 << 30) | (b1 << 22) | (b2 << 15) | (b3 << 7) | b4
}