const PACK_HEADER_ID: u8 = 0xBA;
const SYSTEM_HEADER_ID: u8 = 0xBB;
const PROGRAM_END_ID: u8 = 0xB9;
const PRIVATE_STREAM_1: u8 = 0xBD;
#[derive(Debug, Clone)]
pub struct PsPacket {
pub stream_id: u8,
pub sub_stream_id: Option<u8>,
pub pts: Option<u64>,
pub dts: Option<u64>,
pub data: Vec<u8>,
}
pub struct PsDemuxer {
buffer: Vec<u8>,
}
impl Default for PsDemuxer {
fn default() -> Self {
Self::new()
}
}
impl PsDemuxer {
pub fn new() -> Self {
Self {
buffer: Vec::with_capacity(64 * 1024),
}
}
pub fn feed(&mut self, data: &[u8]) -> Vec<PsPacket> {
self.buffer.extend_from_slice(data);
self.extract_packets()
}
pub fn flush(&mut self) -> Vec<PsPacket> {
let packets = self.extract_packets();
self.buffer.clear();
packets
}
fn extract_packets(&mut self) -> Vec<PsPacket> {
let mut packets = Vec::with_capacity(4);
let mut pos = 0;
while let Some(sc) = find_start_code(&self.buffer, pos) {
if sc + 3 >= self.buffer.len() {
break;
}
let code = self.buffer[sc + 3];
match code {
PROGRAM_END_ID => {
pos = sc + 4;
}
PACK_HEADER_ID => {
if sc + 14 > self.buffer.len() {
break; }
let stuffing = (self.buffer[sc + 13] & 0x07) as usize;
let pack_len = 14 + stuffing;
if sc + pack_len > self.buffer.len() {
break;
}
pos = sc + pack_len;
}
SYSTEM_HEADER_ID => {
if sc + 6 > self.buffer.len() {
break;
}
let header_len =
((self.buffer[sc + 4] as usize) << 8) | self.buffer[sc + 5] as usize;
let total = 6 + header_len;
if sc + total > self.buffer.len() {
break;
}
pos = sc + total;
}
id if is_pes_stream_id(id) => {
if sc + 6 > self.buffer.len() {
break;
}
let pes_packet_len =
((self.buffer[sc + 4] as usize) << 8) | self.buffer[sc + 5] as usize;
let end = if pes_packet_len == 0 {
match find_start_code(&self.buffer, sc + 4) {
Some(next_sc) => next_sc,
None => break, }
} else {
let e = sc + 6 + pes_packet_len;
if e > self.buffer.len() {
break; }
e
};
if let Some(pkt) = parse_pes_packet(&self.buffer[sc..end]) {
packets.push(pkt);
}
pos = end;
}
_ => {
pos = sc + 4;
}
}
}
if pos > 0 {
self.buffer.drain(..pos);
}
packets
}
}
fn is_pes_stream_id(id: u8) -> bool {
matches!(id, 0xBD..=0xEF)
}
fn parse_pes_packet(data: &[u8]) -> Option<PsPacket> {
if data.len() < 6 {
return None;
}
if data[0] != 0x00 || data[1] != 0x00 || data[2] != 0x01 {
return None;
}
let stream_id = data[3];
if stream_id == 0xBE {
return None;
}
if stream_id == 0xBF {
let payload = if data.len() > 6 { &data[6..] } else { &[] };
return Some(PsPacket {
stream_id,
sub_stream_id: None,
pts: None,
dts: None,
data: payload.to_vec(),
});
}
if data.len() < 9 {
return None;
}
let pts_dts_flags = (data[7] >> 6) & 0x03;
let header_data_len = data[8] as usize;
let header_end = 9 + header_data_len;
if header_end > data.len() {
return None;
}
let mut pts = None;
let mut dts = None;
if pts_dts_flags >= 2 && data.len() >= 14 {
pts = Some(parse_pts(&data[9..14]));
}
if pts_dts_flags == 3 && data.len() >= 19 {
dts = Some(parse_pts(&data[14..19]));
}
let payload = &data[header_end..];
let (sub_stream_id, es_data) = if stream_id == PRIVATE_STREAM_1 && !payload.is_empty() {
let sub_id = payload[0];
let skip = match sub_id {
0x80..=0x8F => 4, 0xA0..=0xA7 => 7, _ => 1,
};
let start = skip.min(payload.len());
(Some(sub_id), payload[start..].to_vec())
} else {
(None, payload.to_vec())
};
Some(PsPacket {
stream_id,
sub_stream_id,
pts,
dts,
data: es_data,
})
}
fn parse_pts(buf: &[u8]) -> u64 {
debug_assert!(buf.len() >= 5);
let b0 = buf[0] as u64;
let b1 = buf[1] as u64;
let b2 = buf[2] as u64;
let b3 = buf[3] as u64;
let b4 = buf[4] as u64;
((b0 >> 1) & 0x07) << 30 | b1 << 22 | (b2 >> 1) << 15 | b3 << 7 | b4 >> 1
}
fn find_start_code(data: &[u8], from: usize) -> Option<usize> {
if data.len() < from + 3 {
return None;
}
(from..data.len() - 2).find(|&i| data[i] == 0x00 && data[i + 1] == 0x00 && data[i + 2] == 0x01)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detect_pack_header() {
let mut demuxer = PsDemuxer::new();
let mut pack = vec![
0x00, 0x00, 0x01, 0xBA, 0x44, 0x00, 0x04, 0x00, 0x04, 0x01, 0x01, 0x89, 0xC3, 0xF8, ];
pack.extend_from_slice(&[
0x00, 0x00, 0x01, 0xE0, 0x00, 0x08, 0x80, 0x00, 0x00, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, ]);
let packets = demuxer.feed(&pack);
assert_eq!(packets.len(), 1);
assert_eq!(packets[0].stream_id, 0xE0);
assert_eq!(packets[0].data, vec![0xAA, 0xBB, 0xCC, 0xDD, 0xEE]);
}
#[test]
fn pack_header_with_stuffing() {
let mut demuxer = PsDemuxer::new();
let mut data = vec![
0x00, 0x00, 0x01, 0xBA, 0x44, 0x00, 0x04, 0x00, 0x04, 0x01, 0x01, 0x89, 0xC3,
0xFB, 0xFF, 0xFF, 0xFF, ];
data.extend_from_slice(&[
0x00, 0x00, 0x01, 0xC0, 0x00, 0x05, 0x80, 0x00, 0x00, 0x11, 0x22, ]);
let packets = demuxer.feed(&data);
assert_eq!(packets.len(), 1);
assert_eq!(packets[0].stream_id, 0xC0);
assert_eq!(packets[0].data, vec![0x11, 0x22]);
}
#[test]
fn pes_header_with_pts() {
let mut demuxer = PsDemuxer::new();
let pts_bytes = encode_pts(90000, 0x20);
let mut data = vec![
0x00, 0x00, 0x01, 0xE0, 0x00, 0x0D, 0x80, 0x80, 0x05, ];
data.extend_from_slice(&pts_bytes);
data.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0x00]);
data.extend_from_slice(&[0x00, 0x00, 0x01, 0xB9]);
let packets = demuxer.feed(&data);
assert_eq!(packets.len(), 1);
assert_eq!(packets[0].stream_id, 0xE0);
assert_eq!(packets[0].pts, Some(90000));
assert!(packets[0].dts.is_none());
assert_eq!(packets[0].data, vec![0xDE, 0xAD, 0xBE, 0xEF, 0x00]);
}
#[test]
fn pes_header_with_pts_and_dts() {
let mut demuxer = PsDemuxer::new();
let pts_bytes = encode_pts(180000, 0x30); let dts_bytes = encode_pts(90000, 0x10);
let mut data = vec![
0x00, 0x00, 0x01, 0xE0, 0x00, 0x11, 0x80, 0xC0, 0x0A, ];
data.extend_from_slice(&pts_bytes);
data.extend_from_slice(&dts_bytes);
data.extend_from_slice(&[0xCA, 0xFE]);
data.extend_from_slice(&[0x00, 0x00, 0x01, 0xB9]);
let packets = demuxer.feed(&data);
assert_eq!(packets.len(), 1);
assert_eq!(packets[0].pts, Some(180000));
assert_eq!(packets[0].dts, Some(90000));
}
#[test]
fn private_stream_1_ac3_substream() {
let mut demuxer = PsDemuxer::new();
let mut data = vec![
0x00, 0x00, 0x01, 0xBD, 0x00, 0x0B, 0x80, 0x00, 0x00, 0x80, 0x01, 0x00, 0x02, 0xAA, 0xBB, 0xCC, 0xDD, ];
data.extend_from_slice(&[0x00, 0x00, 0x01, 0xB9]);
let packets = demuxer.feed(&data);
assert_eq!(packets.len(), 1);
assert_eq!(packets[0].stream_id, 0xBD);
assert_eq!(packets[0].sub_stream_id, Some(0x80));
assert_eq!(packets[0].data, vec![0xAA, 0xBB, 0xCC, 0xDD]);
}
#[test]
fn private_stream_1_dts_substream() {
let mut demuxer = PsDemuxer::new();
let mut data = vec![
0x00, 0x00, 0x01, 0xBD, 0x00, 0x09, 0x80, 0x00, 0x00, 0x88, 0x01, 0x00, 0x00, 0x11, 0x22,
];
data.extend_from_slice(&[0x00, 0x00, 0x01, 0xB9]);
let packets = demuxer.feed(&data);
assert_eq!(packets.len(), 1);
assert_eq!(packets[0].sub_stream_id, Some(0x88));
assert_eq!(packets[0].data, vec![0x11, 0x22]);
}
#[test]
fn private_stream_1_subtitle_substream() {
let mut demuxer = PsDemuxer::new();
let mut data = vec![
0x00, 0x00, 0x01, 0xBD, 0x00, 0x06, 0x80, 0x00, 0x00,
0x20, 0xFF, 0xFE,
];
data.extend_from_slice(&[0x00, 0x00, 0x01, 0xB9]);
let packets = demuxer.feed(&data);
assert_eq!(packets.len(), 1);
assert_eq!(packets[0].sub_stream_id, Some(0x20));
}
#[test]
fn private_stream_1_lpcm_substream() {
let mut demuxer = PsDemuxer::new();
let mut data = vec![
0x00, 0x00, 0x01, 0xBD, 0x00, 0x0C, 0x80, 0x00, 0x00, 0xA0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, ];
data.extend_from_slice(&[0x00, 0x00, 0x01, 0xB9]);
let packets = demuxer.feed(&data);
assert_eq!(packets.len(), 1);
assert_eq!(packets[0].sub_stream_id, Some(0xA0));
assert_eq!(packets[0].data, vec![0x01, 0x02]);
}
#[test]
fn incremental_feed() {
let mut demuxer = PsDemuxer::new();
let mut full = vec![
0x00, 0x00, 0x01, 0xE0, 0x00, 0x06, 0x80, 0x00, 0x00, 0xAA, 0xBB, 0xCC,
];
full.extend_from_slice(&[0x00, 0x00, 0x01, 0xB9]);
let mid = full.len() / 2;
let p1 = demuxer.feed(&full[..mid]);
assert!(p1.is_empty(), "first half should not produce packets");
let p2 = demuxer.feed(&full[mid..]);
assert_eq!(p2.len(), 1);
assert_eq!(p2[0].data, vec![0xAA, 0xBB, 0xCC]);
}
#[test]
fn multiple_pes_packets() {
let mut demuxer = PsDemuxer::new();
let mut data = Vec::new();
data.extend_from_slice(&[
0x00, 0x00, 0x01, 0xE0, 0x00, 0x05, 0x80, 0x00, 0x00, 0x11, 0x22,
]);
data.extend_from_slice(&[
0x00, 0x00, 0x01, 0xC0, 0x00, 0x05, 0x80, 0x00, 0x00, 0x33, 0x44,
]);
data.extend_from_slice(&[0x00, 0x00, 0x01, 0xB9]);
let packets = demuxer.feed(&data);
assert_eq!(packets.len(), 2);
assert_eq!(packets[0].stream_id, 0xE0);
assert_eq!(packets[1].stream_id, 0xC0);
}
#[test]
fn pts_zero() {
let pts = parse_pts(&encode_pts(0, 0x20));
assert_eq!(pts, 0);
}
#[test]
fn pts_large_value() {
let val: u64 = (1 << 32) - 1; let encoded = encode_pts(val, 0x20);
let decoded = parse_pts(&encoded);
assert_eq!(decoded, val);
}
fn encode_pts(pts: u64, marker_prefix: u8) -> [u8; 5] {
let mut buf = [0u8; 5];
buf[0] = marker_prefix | (((pts >> 30) as u8) & 0x07) << 1 | 1;
buf[1] = ((pts >> 22) & 0xFF) as u8;
buf[2] = (((pts >> 15) & 0x7F) as u8) << 1 | 1;
buf[3] = ((pts >> 7) & 0xFF) as u8;
buf[4] = (((pts) & 0x7F) as u8) << 1 | 1;
buf
}
}