use super::Muxer;
use crate::{CodecId, MediaFrame, Result};
use bytes::{BufMut, Bytes, BytesMut};
const TS_PACKET_LEN: usize = 188;
const SYNC_BYTE: u8 = 0x47;
const PID_PAT: u16 = 0x0000;
const PID_PMT: u16 = 0x1000;
const PID_VIDEO: u16 = 0x0100;
const PID_AUDIO: u16 = 0x0101;
const STREAM_TYPE_H264: u8 = 0x1B;
const STREAM_TYPE_H265: u8 = 0x24;
const STREAM_TYPE_AAC_ADTS: u8 = 0x0F;
const PES_STREAM_ID_VIDEO: u8 = 0xE0;
const PES_STREAM_ID_AUDIO: u8 = 0xC0;
const TS_CLOCK_HZ_PER_MS: i64 = 90;
pub struct MpegTsMuxer {
buf: BytesMut,
cc_pat: u8,
cc_pmt: u8,
cc_video: u8,
cc_audio: u8,
codec_string: Option<String>,
has_audio: bool,
video_codec: Option<CodecId>,
psi_written: bool,
}
impl Default for MpegTsMuxer {
fn default() -> Self {
Self::new()
}
}
impl MpegTsMuxer {
pub fn new() -> Self {
Self {
buf: BytesMut::new(),
cc_pat: 0,
cc_pmt: 0,
cc_video: 0,
cc_audio: 0,
codec_string: None,
has_audio: false,
video_codec: None,
psi_written: false,
}
}
fn video_stream_type(&self) -> u8 {
match self.video_codec {
Some(CodecId::H265) => STREAM_TYPE_H265,
_ => STREAM_TYPE_H264,
}
}
fn ensure_psi(&mut self) {
if self.psi_written {
return;
}
let pat = build_pat();
push_section_packet(&mut self.buf, PID_PAT, &mut self.cc_pat, &pat);
let pmt = build_pmt(self.has_audio, self.video_stream_type());
push_section_packet(&mut self.buf, PID_PMT, &mut self.cc_pmt, &pmt);
self.psi_written = true;
}
fn write_video(&mut self, frame: &MediaFrame) {
let pts = frame.pts * TS_CLOCK_HZ_PER_MS;
let dts = frame.dts * TS_CLOCK_HZ_PER_MS;
let es = ensure_aud(&frame.data, self.video_codec.unwrap_or(CodecId::H264));
let pes = build_pes(PES_STREAM_ID_VIDEO, pts, Some(dts), &es);
let pcr = if frame.is_keyframe() { Some(dts) } else { None };
let mut cc = self.cc_video;
packetize(
&mut self.buf,
PID_VIDEO,
&mut cc,
&pes,
pcr,
frame.is_keyframe(),
);
self.cc_video = cc;
}
fn write_audio(&mut self, frame: &MediaFrame) {
let pts = frame.pts * TS_CLOCK_HZ_PER_MS;
let pes = build_pes(PES_STREAM_ID_AUDIO, pts, None, &frame.data);
let mut cc = self.cc_audio;
packetize(&mut self.buf, PID_AUDIO, &mut cc, &pes, None, false);
self.cc_audio = cc;
}
}
impl Muxer for MpegTsMuxer {
fn extension(&self) -> &'static str {
"ts"
}
fn start_segment(&mut self) -> Result<()> {
self.buf.clear();
self.psi_written = false;
Ok(())
}
fn write(&mut self, frame: &MediaFrame) -> Result<()> {
match frame.codec {
CodecId::H264 | CodecId::H265 => {
self.video_codec = Some(frame.codec);
if frame.flags.contains(crate::FrameFlags::CONFIG) {
self.codec_string = video_codec_string(frame.codec, &frame.data);
}
self.ensure_psi();
self.write_video(frame);
}
CodecId::AAC => {
if frame.flags.contains(crate::FrameFlags::CONFIG) {
self.has_audio = true;
return Ok(());
}
self.has_audio = true;
self.ensure_psi();
self.write_audio(frame);
}
other => {
return Err(crate::StreamError::UnsupportedCodec(format!(
"MpegTsMuxer carries H.264/H.265 + AAC only, got {other:?}"
)));
}
}
Ok(())
}
fn finish_segment(&mut self) -> Result<Bytes> {
Ok(std::mem::take(&mut self.buf).freeze())
}
fn codec_string(&self) -> Option<String> {
self.codec_string.clone()
}
}
fn build_pat() -> Vec<u8> {
let mut s = Vec::with_capacity(16);
s.push(0x00); let section_len = 5 + 4 + 4; s.push(0xB0 | (((section_len >> 8) & 0x0F) as u8));
s.push((section_len & 0xFF) as u8);
s.extend_from_slice(&[0x00, 0x01]); s.push(0xC1); s.push(0x00); s.push(0x00); s.extend_from_slice(&[0x00, 0x01]);
s.push(0xE0 | (((PID_PMT >> 8) & 0x1F) as u8));
s.push((PID_PMT & 0xFF) as u8);
append_crc32(&mut s);
s
}
fn build_pmt(has_audio: bool, video_stream_type: u8) -> Vec<u8> {
let mut es = Vec::with_capacity(10);
es.push(video_stream_type);
es.push(0xE0 | (((PID_VIDEO >> 8) & 0x1F) as u8));
es.push((PID_VIDEO & 0xFF) as u8);
es.push(0xF0); es.push(0x00); if has_audio {
es.push(STREAM_TYPE_AAC_ADTS);
es.push(0xE0 | (((PID_AUDIO >> 8) & 0x1F) as u8));
es.push((PID_AUDIO & 0xFF) as u8);
es.push(0xF0);
es.push(0x00);
}
let mut s = Vec::with_capacity(20 + es.len());
s.push(0x02); let section_len = 9 + es.len() + 4;
s.push(0xB0 | (((section_len >> 8) & 0x0F) as u8));
s.push((section_len & 0xFF) as u8);
s.extend_from_slice(&[0x00, 0x01]); s.push(0xC1); s.push(0x00); s.push(0x00); s.push(0xE0 | (((PID_VIDEO >> 8) & 0x1F) as u8)); s.push((PID_VIDEO & 0xFF) as u8); s.push(0xF0); s.push(0x00); s.extend_from_slice(&es);
append_crc32(&mut s);
s
}
fn push_section_packet(out: &mut BytesMut, pid: u16, cc: &mut u8, section: &[u8]) {
let mut pkt = Vec::with_capacity(TS_PACKET_LEN);
pkt.push(SYNC_BYTE);
pkt.push(0x40 | ((pid >> 8) & 0x1F) as u8); pkt.push((pid & 0xFF) as u8);
pkt.push(0x10 | (*cc & 0x0F)); *cc = (*cc + 1) & 0x0F;
pkt.push(0x00); pkt.extend_from_slice(section);
pkt.resize(TS_PACKET_LEN, 0xFF);
out.put_slice(&pkt);
}
fn build_pes(stream_id: u8, pts: i64, dts: Option<i64>, payload: &[u8]) -> Vec<u8> {
let pts_dts_flags = if dts.is_some() { 0b11 } else { 0b10 };
let header_data_len = if dts.is_some() { 10 } else { 5 };
let mut pes = Vec::with_capacity(payload.len() + 14);
pes.extend_from_slice(&[0x00, 0x00, 0x01]); pes.push(stream_id);
let pes_payload_len = 3 + header_data_len + payload.len();
let length_field = if stream_id == PES_STREAM_ID_VIDEO {
0
} else {
pes_payload_len.min(0xFFFF) as u16
};
pes.put_u16(length_field);
pes.push(0x80); pes.push((pts_dts_flags << 6) as u8); pes.push(header_data_len as u8);
write_timestamp(&mut pes, if dts.is_some() { 0b0011 } else { 0b0010 }, pts);
if let Some(dts) = dts {
write_timestamp(&mut pes, 0b0001, dts);
}
pes.extend_from_slice(payload);
pes
}
fn write_timestamp(out: &mut Vec<u8>, prefix: u8, ts: i64) {
let ts = ts as u64 & 0x1_FFFF_FFFF; out.push((prefix << 4) | (((ts >> 30) & 0x07) as u8) << 1 | 0x01);
out.push(((ts >> 22) & 0xFF) as u8);
out.push((((ts >> 15) & 0x7F) as u8) << 1 | 0x01);
out.push(((ts >> 7) & 0xFF) as u8);
out.push(((ts & 0x7F) as u8) << 1 | 0x01);
}
fn packetize(out: &mut BytesMut, pid: u16, cc: &mut u8, pes: &[u8], pcr: Option<i64>, rai: bool) {
let mut pos = 0;
let mut first = true;
while pos < pes.len() {
let remaining = pes.len() - pos;
let want_af = first && (pcr.is_some() || rai);
let af_fixed = if want_af {
2 + if pcr.is_some() { 6 } else { 0 }
} else {
2 };
let max_payload_with_af = TS_PACKET_LEN - 4 - af_fixed;
let use_af = want_af || remaining < (TS_PACKET_LEN - 4);
let (payload_len, stuffing) = if use_af {
let take = remaining.min(max_payload_with_af);
(take, max_payload_with_af - take)
} else {
(TS_PACKET_LEN - 4, 0)
};
let mut pkt = Vec::with_capacity(TS_PACKET_LEN);
pkt.push(SYNC_BYTE);
let pusi = if first { 0x40 } else { 0x00 };
pkt.push(pusi | ((pid >> 8) & 0x1F) as u8);
pkt.push((pid & 0xFF) as u8);
let afc = if use_af { 0b11 } else { 0b01 };
pkt.push((afc << 4) | (*cc & 0x0F));
*cc = (*cc + 1) & 0x0F;
if use_af {
let mut flags = 0u8;
if want_af && rai {
flags |= 0x40; }
let pcr_present = want_af && pcr.is_some();
if pcr_present {
flags |= 0x10; }
let af_len = 1 + if pcr_present { 6 } else { 0 } + stuffing;
pkt.push(af_len as u8);
pkt.push(flags);
if pcr_present {
write_pcr(&mut pkt, pcr.unwrap());
}
pkt.resize(pkt.len() + stuffing, 0xFF);
}
pkt.extend_from_slice(&pes[pos..pos + payload_len]);
debug_assert_eq!(pkt.len(), TS_PACKET_LEN);
out.put_slice(&pkt);
pos += payload_len;
first = false;
}
}
fn write_pcr(out: &mut Vec<u8>, base90: i64) {
let base = base90 as u64 & 0x1_FFFF_FFFF;
out.push(((base >> 25) & 0xFF) as u8);
out.push(((base >> 17) & 0xFF) as u8);
out.push(((base >> 9) & 0xFF) as u8);
out.push(((base >> 1) & 0xFF) as u8);
out.push((((base & 0x01) as u8) << 7) | 0x7E); out.push(0x00); }
fn ensure_aud(annexb: &[u8], codec: CodecId) -> Bytes {
match codec {
CodecId::H265 => {
let starts_with_aud = match annexb {
[0, 0, 0, 1, b, ..] | [0, 0, 1, b, ..] => ((b >> 1) & 0x3F) == 35,
_ => false,
};
if starts_with_aud {
return Bytes::copy_from_slice(annexb);
}
let mut out = BytesMut::with_capacity(annexb.len() + 7);
out.put_slice(&[0x00, 0x00, 0x00, 0x01, 0x46, 0x01, 0x50]);
out.put_slice(annexb);
out.freeze()
}
_ => {
let starts_with_aud = match annexb {
[0, 0, 0, 1, b, ..] | [0, 0, 1, b, ..] => (b & 0x1F) == 9,
_ => false,
};
if starts_with_aud {
return Bytes::copy_from_slice(annexb);
}
let mut out = BytesMut::with_capacity(annexb.len() + 6);
out.put_slice(&[0x00, 0x00, 0x00, 0x01, 0x09, 0xF0]); out.put_slice(annexb);
out.freeze()
}
}
}
fn video_codec_string(codec: CodecId, annexb: &[u8]) -> Option<String> {
use crate::codec::parser::CodecParser;
match codec {
CodecId::H264 => {
let params = crate::codec::h264::H264::parse_config(annexb)?;
Some(crate::codec::h264::hls_codec_string(¶ms))
}
_ => None,
}
}
fn append_crc32(data: &mut Vec<u8>) {
let mut crc: u32 = 0xFFFF_FFFF;
for &b in data.iter() {
crc ^= (b as u32) << 24;
for _ in 0..8 {
crc = if crc & 0x8000_0000 != 0 {
(crc << 1) ^ 0x04C1_1DB7
} else {
crc << 1
};
}
}
data.extend_from_slice(&crc.to_be_bytes());
}
#[cfg(test)]
mod tests {
use super::*;
use crate::FrameFlags;
use bytes::Bytes;
fn annexb_keyframe() -> Bytes {
Bytes::from_static(&[
0, 0, 0, 1, 0x67, 0x42, 0x00, 0x1F, 0xF4, 0x02, 0x80, 0x2D, 0x80, 0, 0, 0, 1, 0x68, 0xCE, 0x3C, 0x80, 0, 0, 0, 1, 0x65, 0x88, 0x84, 0x00, ])
}
fn all_packets_aligned(ts: &[u8]) {
assert_eq!(ts.len() % TS_PACKET_LEN, 0, "stream is packet-aligned");
for chunk in ts.chunks(TS_PACKET_LEN) {
assert_eq!(chunk[0], SYNC_BYTE, "every packet starts with 0x47");
}
}
fn pids(ts: &[u8]) -> Vec<u16> {
ts.chunks(TS_PACKET_LEN)
.map(|p| (((p[1] & 0x1F) as u16) << 8) | p[2] as u16)
.collect()
}
#[test]
fn segment_opens_with_pat_pmt_and_is_packet_aligned() {
let mut m = MpegTsMuxer::new();
m.start_segment().unwrap();
let kf = MediaFrame::new_video(0, 0, annexb_keyframe(), CodecId::H264, true);
m.write(&kf).unwrap();
let seg = m.finish_segment().unwrap();
all_packets_aligned(&seg);
let pids = pids(&seg);
assert_eq!(pids[0], PID_PAT, "first packet is the PAT");
assert_eq!(pids[1], PID_PMT, "second packet is the PMT");
assert!(pids[2..].contains(&PID_VIDEO), "video PID present");
}
#[test]
fn hevc_segment_declares_h265_stream_type_and_inserts_aud() {
let mut m = MpegTsMuxer::new();
m.start_segment().unwrap();
let hevc = Bytes::from_static(&[
0, 0, 0, 1, 0x40, 0x01, 0xAA, 0, 0, 0, 1, 0x26, 0x01, 0x88, 0x99, ]);
let kf = MediaFrame::new_video(0, 0, hevc, CodecId::H265, true);
m.write(&kf).unwrap();
let seg = m.finish_segment().unwrap();
all_packets_aligned(&seg);
let pmt = seg
.chunks(TS_PACKET_LEN)
.find(|p| (((p[1] & 0x1F) as u16) << 8 | p[2] as u16) == PID_PMT)
.expect("PMT packet");
assert_eq!(pmt[5 + 12], STREAM_TYPE_H265, "PMT declares HEVC");
assert!(pids(&seg)[2..].contains(&PID_VIDEO), "video PID present");
let has_hevc_aud = seg.windows(7).any(|w| w == [0, 0, 0, 1, 0x46, 0x01, 0x50]);
assert!(has_hevc_aud, "HEVC access unit delimiter inserted");
}
#[test]
fn h264_segment_still_declares_avc_stream_type() {
let mut m = MpegTsMuxer::new();
m.start_segment().unwrap();
m.write(&MediaFrame::new_video(
0,
0,
annexb_keyframe(),
CodecId::H264,
true,
))
.unwrap();
let seg = m.finish_segment().unwrap();
let pmt = seg
.chunks(TS_PACKET_LEN)
.find(|p| (((p[1] & 0x1F) as u16) << 8 | p[2] as u16) == PID_PMT)
.expect("PMT packet");
assert_eq!(pmt[5 + 12], STREAM_TYPE_H264, "PMT declares H.264");
}
#[test]
fn keyframe_packet_carries_pcr_and_random_access() {
let mut m = MpegTsMuxer::new();
m.start_segment().unwrap();
m.write(&MediaFrame::new_video(
0,
0,
annexb_keyframe(),
CodecId::H264,
true,
))
.unwrap();
let seg = m.finish_segment().unwrap();
let vpkt = seg
.chunks(TS_PACKET_LEN)
.find(|p| (((p[1] & 0x1F) as u16) << 8 | p[2] as u16) == PID_VIDEO)
.expect("video packet");
let afc = (vpkt[3] >> 4) & 0x03;
assert_eq!(afc, 0b11, "adaptation field + payload present");
let flags = vpkt[5];
assert_ne!(flags & 0x40, 0, "random_access_indicator set");
assert_ne!(flags & 0x10, 0, "PCR_flag set");
}
#[test]
fn audio_is_declared_in_pmt_only_after_first_audio_frame() {
let mut m = MpegTsMuxer::new();
m.start_segment().unwrap();
let adts = Bytes::from_static(&[0xFF, 0xF1, 0x50, 0x80, 0x01, 0xA0, 0x01, 0xCC]);
m.write(&MediaFrame::new_audio(0, adts, CodecId::AAC))
.unwrap();
let seg = m.finish_segment().unwrap();
assert!(pids(&seg).contains(&PID_AUDIO), "audio PID present");
}
#[test]
fn long_payload_spans_multiple_packets_and_stays_aligned() {
let mut m = MpegTsMuxer::new();
m.start_segment().unwrap();
let mut big = vec![0, 0, 0, 1, 0x65]; big.extend(std::iter::repeat_n(0xAB, 1000)); m.write(&MediaFrame::new_video(
33,
33,
Bytes::from(big),
CodecId::H264,
true,
))
.unwrap();
let seg = m.finish_segment().unwrap();
all_packets_aligned(&seg);
let video_packets = pids(&seg).iter().filter(|&&p| p == PID_VIDEO).count();
assert!(video_packets > 1, "payload spread over multiple packets");
}
#[test]
fn config_frame_sets_codec_string() {
let mut m = MpegTsMuxer::new();
m.start_segment().unwrap();
let mut cfg = MediaFrame::new_video(0, 0, annexb_keyframe(), CodecId::H264, false);
cfg.flags |= FrameFlags::CONFIG;
m.write(&cfg).unwrap();
assert!(
m.codec_string().is_some_and(|s| s.starts_with("avc1.")),
"codec string derived from SPS"
);
}
#[test]
fn crc32_matches_known_vector() {
let mut data = b"123456789".to_vec();
append_crc32(&mut data);
let crc = &data[data.len() - 4..];
assert_eq!(crc, &[0x03, 0x76, 0xE6, 0xE7]);
}
}