const BD_TS_PACKET_SIZE: usize = 192;
const TS_PACKET_SIZE: usize = 188;
const SYNC_BYTE: u8 = 0x47;
#[derive(Debug)]
pub struct PesPacket {
pub pid: u16,
pub pts: Option<i64>,
pub dts: Option<i64>,
pub data: Vec<u8>,
}
struct PesAssembler {
pid: u16,
buffer: Vec<u8>,
pts: Option<i64>,
dts: Option<i64>,
active: bool,
}
impl PesAssembler {
fn new(pid: u16) -> Self {
Self {
pid,
buffer: Vec::with_capacity(256 * 1024),
pts: None,
dts: None,
active: false,
}
}
fn start(&mut self, pts: Option<i64>, dts: Option<i64>) -> Option<PesPacket> {
let completed = if self.active && !self.buffer.is_empty() {
Some(PesPacket {
pid: self.pid,
pts: self.pts,
dts: self.dts,
data: std::mem::replace(&mut self.buffer, Vec::with_capacity(256 * 1024)),
})
} else {
self.buffer.clear();
None
};
self.pts = pts;
self.dts = dts;
self.active = true;
completed
}
fn push(&mut self, data: &[u8]) {
if self.active {
self.buffer.extend_from_slice(data);
}
}
fn flush(&mut self) -> Option<PesPacket> {
if self.active && !self.buffer.is_empty() {
self.active = false;
Some(PesPacket {
pid: self.pid,
pts: self.pts,
dts: self.dts,
data: std::mem::take(&mut self.buffer),
})
} else {
None
}
}
}
pub struct TsDemuxer {
assemblers: Vec<PesAssembler>,
pid_index: [i16; 8192], }
impl TsDemuxer {
pub fn new(pids: &[u16]) -> Self {
let mut pid_index = [-1i16; 8192];
let mut assemblers = Vec::with_capacity(pids.len());
for (i, &pid) in pids.iter().enumerate() {
pid_index[pid as usize] = i as i16;
assemblers.push(PesAssembler::new(pid));
}
Self { assemblers, pid_index }
}
pub fn feed(&mut self, data: &[u8]) -> Vec<PesPacket> {
let mut completed = Vec::new();
let mut offset = 0;
while offset + BD_TS_PACKET_SIZE <= data.len() {
let packet = &data[offset..offset + BD_TS_PACKET_SIZE];
offset += BD_TS_PACKET_SIZE;
if packet[4] != SYNC_BYTE {
continue;
}
let ts = &packet[4..];
let pid = (((ts[1] & 0x1F) as u16) << 8) | ts[2] as u16;
let pusi = ts[1] & 0x40 != 0; let adaptation = (ts[3] >> 4) & 0x03;
let idx = self.pid_index[pid as usize];
if idx < 0 {
continue;
}
let asm = &mut self.assemblers[idx as usize];
let payload_start = if adaptation == 0x03 || adaptation == 0x02 {
let af_len = ts[4] as usize;
5 + af_len
} else {
4
};
if payload_start >= TS_PACKET_SIZE {
continue;
}
if adaptation == 0x02 {
continue;
}
let payload = &ts[payload_start..];
if pusi {
let (pts, dts, pes_data_start) = parse_pes_header(payload);
if let Some(prev) = asm.start(pts, dts) {
completed.push(prev);
}
if pes_data_start < payload.len() {
asm.push(&payload[pes_data_start..]);
}
} else {
asm.push(payload);
}
}
completed
}
pub fn flush(&mut self) -> Vec<PesPacket> {
let mut completed = Vec::new();
for asm in &mut self.assemblers {
if let Some(pkt) = asm.flush() {
completed.push(pkt);
}
}
completed
}
}
fn parse_pes_header(data: &[u8]) -> (Option<i64>, Option<i64>, usize) {
if data.len() < 9 || data[0] != 0x00 || data[1] != 0x00 || data[2] != 0x01 {
return (None, None, 0);
}
let stream_id = data[3];
if stream_id == 0xBC || stream_id == 0xBE || stream_id == 0xBF
|| stream_id == 0xF0 || stream_id == 0xF1 || stream_id == 0xFF
{
return (None, None, 6);
}
if data.len() < 9 {
return (None, None, 6);
}
let pts_dts_flags = (data[7] >> 6) & 0x03;
let header_data_len = data[8] as usize;
let data_start = 9 + header_data_len;
let mut pts = None;
let mut dts = None;
if pts_dts_flags >= 2 && data.len() >= 14 {
pts = Some(parse_timestamp(&data[9..14]));
}
if pts_dts_flags == 3 && data.len() >= 19 {
dts = Some(parse_timestamp(&data[14..19]));
}
(pts, dts, data_start)
}
fn parse_timestamp(data: &[u8]) -> i64 {
let b0 = data[0] as i64;
let b1 = data[1] as i64;
let b2 = data[2] as i64;
let b3 = data[3] as i64;
let b4 = data[4] as i64;
((b0 >> 1) & 0x07) << 30
| b1 << 22
| (b2 >> 1) << 15
| b3 << 7
| b4 >> 1
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_timestamp() {
let data = [0x21, 0x00, 0x01, 0x00, 0x01];
assert_eq!(parse_timestamp(&data), 0);
let data2 = [0x21, 0x00, 0x07, 0xE9, 0x01]; let pts = parse_timestamp(&data2);
assert!(pts >= 0);
}
#[test]
fn test_demuxer_empty() {
let mut demux = TsDemuxer::new(&[0x1011]);
let result = demux.feed(&[]);
assert!(result.is_empty());
}
}