use crate::CodecId;
use bytes::Bytes;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TsTrackKind {
Video,
Audio,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TsPayload {
pub data: Bytes,
pub codec: CodecId,
pub kind: TsTrackKind,
pub pts_ms: i64,
pub keyframe: bool,
}
#[derive(Debug)]
struct Track {
pid: u16,
codec: CodecId,
kind: TsTrackKind,
pes: Vec<u8>,
pts: i64,
open: bool,
}
impl Track {
fn new(pid: u16, codec: CodecId, kind: TsTrackKind) -> Self {
Self {
pid,
codec,
kind,
pes: Vec::new(),
pts: 0,
open: false,
}
}
fn feed(&mut self, payload: &[u8], pusi: bool, out: &mut Vec<TsPayload>) {
if pusi {
self.flush(out);
if let Some((pts, es_offset)) = parse_pes_header(payload) {
self.pts = pts;
self.open = true;
self.pes.extend_from_slice(&payload[es_offset..]);
}
} else if self.open {
self.pes.extend_from_slice(payload);
}
}
fn flush(&mut self, out: &mut Vec<TsPayload>) {
if !self.open || self.pes.is_empty() {
self.pes.clear();
self.open = false;
return;
}
let es = std::mem::take(&mut self.pes);
let keyframe = matches!(self.kind, TsTrackKind::Video) && is_keyframe(&es, self.codec);
out.push(TsPayload {
data: Bytes::from(es),
codec: self.codec,
kind: self.kind,
pts_ms: self.pts / 90,
keyframe,
});
self.open = false;
}
}
const TS_PACKET_LEN: usize = 188;
const TS_SYNC: u8 = 0x47;
#[derive(Debug)]
pub struct TsDemuxer {
pmt_pid: Option<u16>,
video: Option<Track>,
audio: Option<Track>,
carry: Vec<u8>,
}
impl Default for TsDemuxer {
fn default() -> Self {
Self::new()
}
}
impl TsDemuxer {
pub fn new() -> Self {
Self {
pmt_pid: None,
video: None,
audio: None,
carry: Vec::new(),
}
}
pub fn push(&mut self, bytes: &[u8]) -> Vec<TsPayload> {
let mut out = Vec::new();
let mut data = std::mem::take(&mut self.carry);
data.extend_from_slice(bytes);
let mut i = 0;
while i + TS_PACKET_LEN <= data.len() {
let pkt = &data[i..i + TS_PACKET_LEN];
if pkt[0] == TS_SYNC {
self.handle_packet(pkt, &mut out);
i += TS_PACKET_LEN;
} else {
i += 1;
}
}
self.carry = data[i..].to_vec();
out
}
fn handle_packet(&mut self, pkt: &[u8], out: &mut Vec<TsPayload>) {
let pusi = pkt[1] & 0x40 != 0;
let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
let adaptation = (pkt[3] >> 4) & 0x03;
let has_payload = adaptation == 1 || adaptation == 3;
if !has_payload {
return;
}
let mut payload_start = 4;
if adaptation == 3 {
let af_len = pkt[4] as usize;
payload_start = 5 + af_len;
}
if payload_start >= TS_PACKET_LEN {
return;
}
let payload = &pkt[payload_start..];
if pid == 0 {
self.parse_pat(payload, pusi);
} else if Some(pid) == self.pmt_pid {
self.parse_pmt(payload, pusi);
} else if let Some(track) = self
.video
.as_mut()
.filter(|t| t.pid == pid)
.or_else(|| self.audio.as_mut().filter(|t| t.pid == pid))
{
track.feed(payload, pusi, out);
}
}
fn parse_pat(&mut self, payload: &[u8], pusi: bool) {
let section = section_body(payload, pusi);
let Some(section) = section else { return };
let mut i = 8;
while i + 4 <= section.len().saturating_sub(4) {
let program = u16::from_be_bytes([section[i], section[i + 1]]);
let pid = (((section[i + 2] & 0x1F) as u16) << 8) | section[i + 3] as u16;
if program != 0 {
self.pmt_pid = Some(pid);
return;
}
i += 4;
}
}
fn parse_pmt(&mut self, payload: &[u8], pusi: bool) {
let Some(section) = section_body(payload, pusi) else {
return;
};
if section.len() < 12 {
return;
}
let program_info_len = (((section[10] & 0x0F) as usize) << 8) | section[11] as usize;
let mut i = 12 + program_info_len;
while i + 5 <= section.len().saturating_sub(4) {
let stream_type = section[i];
let pid = (((section[i + 1] & 0x1F) as u16) << 8) | section[i + 2] as u16;
let es_info_len = (((section[i + 3] & 0x0F) as usize) << 8) | section[i + 4] as usize;
match stream_type_to_track(stream_type) {
Some((codec, TsTrackKind::Video)) if self.video.is_none() => {
self.video = Some(Track::new(pid, codec, TsTrackKind::Video));
}
Some((codec, TsTrackKind::Audio)) if self.audio.is_none() => {
self.audio = Some(Track::new(pid, codec, TsTrackKind::Audio));
}
_ => {}
}
i += 5 + es_info_len;
}
}
}
fn stream_type_to_track(stream_type: u8) -> Option<(CodecId, TsTrackKind)> {
match stream_type {
0x1B => Some((CodecId::H264, TsTrackKind::Video)),
0x24 => Some((CodecId::H265, TsTrackKind::Video)),
0x0F | 0x11 => Some((CodecId::AAC, TsTrackKind::Audio)), 0x03 | 0x04 => Some((CodecId::MP3, TsTrackKind::Audio)),
_ => None,
}
}
fn section_body(payload: &[u8], pusi: bool) -> Option<&[u8]> {
if pusi {
let pointer = *payload.first()? as usize;
payload.get(1 + pointer..)
} else {
Some(payload)
}
}
fn parse_pes_header(p: &[u8]) -> Option<(i64, usize)> {
if p.len() < 9 || p[0] != 0 || p[1] != 0 || p[2] != 1 {
return None;
}
let header_data_len = p[8] as usize;
let es_offset = (9 + header_data_len).min(p.len());
let pts_dts_flags = p[7] >> 6;
let pts = if pts_dts_flags & 0x02 != 0 && p.len() >= 14 {
let b = &p[9..14];
(((b[0] as i64 >> 1) & 0x07) << 30)
| ((b[1] as i64) << 22)
| (((b[2] as i64 >> 1) & 0x7F) << 15)
| ((b[3] as i64) << 7)
| ((b[4] as i64 >> 1) & 0x7F)
} else {
0
};
Some((pts, es_offset))
}
fn is_keyframe(es: &[u8], codec: CodecId) -> bool {
let mut i = 0;
while i + 4 < es.len() {
let sc3 = es[i] == 0 && es[i + 1] == 0 && es[i + 2] == 1;
let sc4 = es[i] == 0 && es[i + 1] == 0 && es[i + 2] == 0 && es[i + 3] == 1;
if sc3 || sc4 {
let nal_off = if sc4 { i + 4 } else { i + 3 };
if let Some(&hdr) = es.get(nal_off) {
match codec {
CodecId::H264 if hdr & 0x1F == 5 => return true,
CodecId::H265 if (16..=21).contains(&((hdr >> 1) & 0x3F)) => return true,
_ => {}
}
}
i = nal_off;
} else {
i += 1;
}
}
false
}
#[cfg(test)]
mod tests {
use super::*;
fn ts_packet(pid: u16, pusi: bool, payload: &[u8]) -> Vec<u8> {
let mut pkt = vec![0u8; TS_PACKET_LEN];
pkt[0] = TS_SYNC;
pkt[1] = if pusi { 0x40 } else { 0 } | ((pid >> 8) as u8 & 0x1F);
pkt[2] = (pid & 0xFF) as u8;
pkt[3] = 0x10; let n = payload.len().min(TS_PACKET_LEN - 4);
pkt[4..4 + n].copy_from_slice(&payload[..n]);
pkt
}
fn pat() -> Vec<u8> {
let mut sec = vec![0u8]; sec.extend_from_slice(&[0x00, 0xB0, 0x0D, 0, 0, 0xC1, 0, 0]);
sec.extend_from_slice(&[0x00, 0x01]); sec.extend_from_slice(&[0xE0 | 0x10, 0x00]); sec.extend_from_slice(&[0, 0, 0, 0]); ts_packet(0, true, &sec)
}
fn pmt_with(audio: bool) -> Vec<u8> {
let mut sec = vec![0u8]; sec.extend_from_slice(&[0x02, 0xB0, 0x12, 0, 0x01, 0xC1, 0, 0]);
sec.extend_from_slice(&[0xE1, 0x00]); sec.extend_from_slice(&[0xF0, 0x00]); sec.extend_from_slice(&[0x1B, 0xE1, 0x00, 0xF0, 0x00]); if audio {
sec.extend_from_slice(&[0x0F, 0xE1, 0x01, 0xF0, 0x00]); }
sec.extend_from_slice(&[0, 0, 0, 0]); ts_packet(0x1000, true, &sec)
}
fn pmt() -> Vec<u8> {
pmt_with(false)
}
fn pes_on(pid: u16, stream_id: u8, es: &[u8], pts: i64) -> Vec<u8> {
let mut p = vec![0x00, 0x00, 0x01, stream_id, 0x00, 0x00, 0x80, 0x80, 0x05];
let pts = pts as u64;
p.push((0x21 | (((pts >> 30) & 0x07) << 1)) as u8);
p.push(((pts >> 22) & 0xFF) as u8);
p.push((0x01 | (((pts >> 15) & 0x7F) << 1)) as u8);
p.push(((pts >> 7) & 0xFF) as u8);
p.push((0x01 | ((pts & 0x7F) << 1)) as u8);
p.extend_from_slice(es);
ts_packet(pid, true, &p)
}
fn video_pes(es: &[u8], pts: i64) -> Vec<u8> {
pes_on(0x0100, 0xE0, es, pts)
}
fn audio_pes(es: &[u8], pts: i64) -> Vec<u8> {
pes_on(0x0101, 0xC0, es, pts)
}
#[test]
fn pes_header_decodes_pts() {
let pes = video_pes(&[], 90_000);
let (pts, _off) = parse_pes_header(&pes[4..]).unwrap();
assert_eq!(pts, 90_000);
}
#[test]
fn keyframe_detection_h264_idr() {
let idr = [0, 0, 0, 1, 0x65, 0xAA];
assert!(is_keyframe(&idr, CodecId::H264));
let non_idr = [0, 0, 0, 1, 0x41, 0xAA];
assert!(!is_keyframe(&non_idr, CodecId::H264));
}
#[test]
fn full_chain_pat_pmt_pes_emits_access_unit() {
let mut d = TsDemuxer::new();
assert!(d.push(&pat()).is_empty());
assert!(d.push(&pmt()).is_empty());
assert_eq!(d.video.as_ref().unwrap().pid, 0x0100);
assert_eq!(d.video.as_ref().unwrap().codec, CodecId::H264);
let idr = [0, 0, 0, 1, 0x65, 0x11, 0x22];
assert!(d.push(&video_pes(&idr, 9000)).is_empty());
let delta = [0, 0, 0, 1, 0x41, 0x33];
let out = d.push(&video_pes(&delta, 12000));
assert_eq!(out.len(), 1);
assert_eq!(out[0].codec, CodecId::H264);
assert_eq!(out[0].kind, TsTrackKind::Video);
assert_eq!(out[0].pts_ms, 100); assert!(out[0].keyframe);
assert!(out[0].data.starts_with(&idr));
}
#[test]
fn carries_partial_packet_across_pushes() {
let mut d = TsDemuxer::new();
let p = pat();
assert!(d.push(&p[..100]).is_empty());
assert!(d.push(&p[100..]).is_empty());
d.push(&pmt());
assert_eq!(d.video.as_ref().unwrap().pid, 0x0100);
}
#[test]
fn demuxes_audio_track_alongside_video() {
let mut d = TsDemuxer::new();
d.push(&pat());
d.push(&pmt_with(true));
assert_eq!(d.audio.as_ref().unwrap().pid, 0x0101);
assert_eq!(d.audio.as_ref().unwrap().codec, CodecId::AAC);
let adts = [0xFF, 0xF1, 0x4C, 0x80, 0x01, 0x23];
assert!(d.push(&audio_pes(&adts, 18000)).is_empty());
let out = d.push(&audio_pes(&[0xFF, 0xF1, 0x00], 19000));
assert_eq!(out.len(), 1);
let au = &out[0];
assert_eq!(au.kind, TsTrackKind::Audio);
assert_eq!(au.codec, CodecId::AAC);
assert!(!au.keyframe, "audio access units are never keyframes");
assert_eq!(au.pts_ms, 200); assert!(au.data.starts_with(&adts));
}
#[test]
fn pes_header_with_oversized_declared_length_is_clamped() {
let p = [0x00, 0x00, 0x01, 0xE0, 0x00, 0x00, 0x80, 0x00, 0xFF, 0xAA];
let (_pts, es_offset) = parse_pes_header(&p).unwrap();
assert_eq!(es_offset, p.len(), "offset clamped to payload length");
let _ = &p[es_offset..];
}
#[test]
fn demuxer_survives_oversized_pes_header() {
let mut d = TsDemuxer::new();
d.push(&pat());
d.push(&pmt());
let mut pes = vec![0x00, 0x00, 0x01, 0xE0, 0x00, 0x00, 0x80, 0x00, 0xFF];
pes.extend_from_slice(&[0x11, 0x22]); let _ = d.push(&ts_packet(0x0100, true, &pes));
}
#[test]
fn audio_only_stream_type_maps_to_aac() {
assert_eq!(
stream_type_to_track(0x0F),
Some((CodecId::AAC, TsTrackKind::Audio))
);
assert_eq!(
stream_type_to_track(0x03),
Some((CodecId::MP3, TsTrackKind::Audio))
);
assert!(stream_type_to_track(0x99).is_none());
}
}