use std::io::{self, Write};
mod packet;
use packet::{Packet, PacketWriter};
const PID_PAT: u16 = 0x0000;
const PID_PMT: u16 = 0x1000;
const PID_VIDEO: u16 = 0x0100;
const PID_AUDIO: u16 = 0x0101;
#[allow(dead_code)]
const PID_NULL: u16 = 0x1FFF;
const PSI_INTERVAL_PACKETS: u64 = 250;
const PCR_INTERVAL_PACKETS: u64 = 40;
const PCR_LEAD_90KHZ: u64 = 90_000 / 5;
const STREAM_TYPE_HEVC: u8 = 0x24;
const STREAM_TYPE_AC3: u8 = 0x81;
const STREAM_TYPE_TRUEHD: u8 = 0x83;
#[derive(Debug, Clone, Copy)]
pub enum AudioCodec {
Ac3,
TrueHd,
}
impl AudioCodec {
fn stream_type(self) -> u8 {
match self {
AudioCodec::Ac3 => STREAM_TYPE_AC3,
AudioCodec::TrueHd => STREAM_TYPE_TRUEHD,
}
}
}
pub struct M2tsMux<W: Write> {
out: PacketWriter<W>,
video_codec_private: Option<Vec<u8>>,
params_written: bool,
audio: Option<AudioCodec>,
base_pts_90k: Option<u64>,
cc_video: u8,
cc_audio: u8,
cc_pat: u8,
cc_pmt: u8,
packets_written: u64,
video_packets_since_pcr: u64,
}
impl<W: Write> M2tsMux<W> {
pub fn new(writer: W) -> Self {
Self {
out: PacketWriter::new(writer),
video_codec_private: None,
params_written: false,
audio: None,
base_pts_90k: None,
cc_video: 0,
cc_audio: 0,
cc_pat: 0,
cc_pmt: 0,
packets_written: 0,
video_packets_since_pcr: 0,
}
}
pub fn set_video_codec_private(&mut self, hvcc: Vec<u8>) {
self.video_codec_private = Some(hvcc);
}
pub fn set_audio(&mut self, codec: AudioCodec) {
self.audio = Some(codec);
}
pub fn write_video(&mut self, pts_ns: i64, data: &[u8]) -> io::Result<()> {
let pts_90k = self.base_relative_pts(pts_ns);
let pcr = pts_90k.saturating_sub(PCR_LEAD_90KHZ);
let mut es = Vec::with_capacity(data.len() + 64);
if !self.params_written {
if let Some(cp) = &self.video_codec_private {
let payload = hvcc_payload(cp);
if !payload.is_empty() {
let params = super::hevc::length_prefixed_to_annex_b(&payload);
es.extend_from_slice(¶ms);
}
}
self.params_written = true;
}
let annex_b = super::hevc::length_prefixed_to_annex_b(data);
es.extend_from_slice(&annex_b);
let pes = build_video_pes(pts_90k, &es);
self.write_pes(PID_VIDEO, &pes, Some(pcr))
}
pub fn write_audio(&mut self, pts_ns: i64, data: &[u8]) -> io::Result<()> {
if self.audio.is_none() {
return Ok(());
}
let pts_90k = self.base_relative_pts(pts_ns);
let pes = build_audio_pes(pts_90k, data);
self.write_pes(PID_AUDIO, &pes, None)
}
pub fn finish(&mut self) -> io::Result<()> {
self.out.flush()
}
fn base_relative_pts(&mut self, pts_ns: i64) -> u64 {
let raw_90k = if pts_ns > 0 {
(pts_ns as u64) * 9 / 100_000
} else {
0
};
let base = *self.base_pts_90k.get_or_insert(raw_90k);
raw_90k.saturating_sub(base)
}
fn write_pes(&mut self, pid: u16, pes: &[u8], pcr: Option<u64>) -> io::Result<()> {
let mut offset = 0;
let mut first = true;
while offset < pes.len() {
self.maybe_emit_psi()?;
let attach_pcr = first
&& (pid == PID_VIDEO)
&& (pcr.is_some())
&& (self.packets_written == 0
|| self.video_packets_since_pcr >= PCR_INTERVAL_PACKETS);
let af_body: Vec<u8> = if attach_pcr {
build_pcr_adaptation(pcr.unwrap_or(0))
} else {
Vec::new()
};
let remaining = pes.len() - offset;
let (af_present, payload_len, stuffing): (bool, usize, usize) = if !af_body.is_empty() {
let max_payload = 184 - 1 - af_body.len();
let p = remaining.min(max_payload);
let s = max_payload - p;
(true, p, s)
} else if remaining >= 184 {
(false, 184, 0)
} else {
let max_payload = 183;
let p = remaining.min(max_payload);
let s = max_payload - p;
(true, p, s)
};
let cc = self.advance_cc(pid);
let mut packet = Packet::new();
packet.set_header(pid, first, true, af_present, cc);
if af_present {
packet.append_adaptation(&af_body, stuffing);
}
packet.append_payload(&pes[offset..offset + payload_len]);
debug_assert_eq!(packet.len(), 188, "packet not 188 bytes");
self.out.write_packet(&packet)?;
self.packets_written += 1;
if pid == PID_VIDEO {
if attach_pcr {
self.video_packets_since_pcr = 0;
} else {
self.video_packets_since_pcr += 1;
}
}
offset += payload_len;
first = false;
}
Ok(())
}
fn advance_cc(&mut self, pid: u16) -> u8 {
let slot = match pid {
PID_VIDEO => &mut self.cc_video,
PID_AUDIO => &mut self.cc_audio,
PID_PAT => &mut self.cc_pat,
PID_PMT => &mut self.cc_pmt,
_ => return 0,
};
let cc = *slot;
*slot = (*slot + 1) & 0x0F;
cc
}
fn maybe_emit_psi(&mut self) -> io::Result<()> {
if self.packets_written == 0 || self.packets_written % PSI_INTERVAL_PACKETS == 0 {
self.emit_pat()?;
self.emit_pmt()?;
}
Ok(())
}
fn emit_pat(&mut self) -> io::Result<()> {
let payload = build_pat(PID_PMT);
let cc = self.advance_cc(PID_PAT);
let mut packet = Packet::new();
packet.set_header(PID_PAT, true, true, false, cc);
packet.append_payload(&payload);
packet.pad_to_188();
self.out.write_packet(&packet)?;
self.packets_written += 1;
Ok(())
}
fn emit_pmt(&mut self) -> io::Result<()> {
let payload = build_pmt(self.audio);
let cc = self.advance_cc(PID_PMT);
let mut packet = Packet::new();
packet.set_header(PID_PMT, true, true, false, cc);
packet.append_payload(&payload);
packet.pad_to_188();
self.out.write_packet(&packet)?;
self.packets_written += 1;
Ok(())
}
}
fn hvcc_payload(hvcc: &[u8]) -> Vec<u8> {
if hvcc.len() < 23 {
return Vec::new();
}
let num_arrays = hvcc[22] as usize;
let mut out = Vec::new();
let mut offset = 23;
for _ in 0..num_arrays {
if offset + 3 > hvcc.len() {
break;
}
offset += 1;
let num_nalus = u16::from_be_bytes([hvcc[offset], hvcc[offset + 1]]) as usize;
offset += 2;
for _ in 0..num_nalus {
if offset + 2 > hvcc.len() {
break;
}
let nal_len = u16::from_be_bytes([hvcc[offset], hvcc[offset + 1]]) as usize;
offset += 2;
if offset + nal_len > hvcc.len() {
break;
}
out.extend_from_slice(&(nal_len as u32).to_be_bytes());
out.extend_from_slice(&hvcc[offset..offset + nal_len]);
offset += nal_len;
}
}
out
}
fn build_video_pes(pts_90k: u64, es: &[u8]) -> Vec<u8> {
build_pes_packet(0xE0, pts_90k, es, false)
}
fn build_audio_pes(pts_90k: u64, es: &[u8]) -> Vec<u8> {
build_pes_packet(0xBD, pts_90k, es, true)
}
fn build_pes_packet(stream_id: u8, pts_90k: u64, es: &[u8], length_in_header: bool) -> Vec<u8> {
let mut out = Vec::with_capacity(es.len() + 14);
out.extend_from_slice(&[0x00, 0x00, 0x01, stream_id]);
let pes_len = 8 + es.len();
if length_in_header && pes_len <= u16::MAX as usize {
out.extend_from_slice(&(pes_len as u16).to_be_bytes());
} else {
out.extend_from_slice(&[0x00, 0x00]);
}
out.push(0x80);
out.push(0x80);
out.push(5);
let pts = pts_90k & 0x1_FFFF_FFFF;
out.push(0x21 | (((pts >> 29) & 0x0E) as u8));
out.push(((pts >> 22) & 0xFF) as u8);
out.push(0x01 | (((pts >> 14) & 0xFE) as u8));
out.push(((pts >> 7) & 0xFF) as u8);
out.push(0x01 | (((pts << 1) & 0xFE) as u8));
out.extend_from_slice(es);
out
}
fn build_pat(pmt_pid: u16) -> Vec<u8> {
let mut section = Vec::new();
section.push(0x00); section.extend_from_slice(&[0xB0, 13]);
section.extend_from_slice(&[0x00, 0x01]); section.push(0xC1); section.push(0x00); section.push(0x00); section.extend_from_slice(&[0x00, 0x01]); let pid_bytes = (0xE000u16 | (pmt_pid & 0x1FFF)).to_be_bytes();
section.extend_from_slice(&pid_bytes);
let crc = mpegts_crc32(§ion);
section.extend_from_slice(&crc.to_be_bytes());
let mut payload = Vec::with_capacity(section.len() + 1);
payload.push(0x00);
payload.extend_from_slice(§ion);
payload
}
fn build_pmt(audio: Option<AudioCodec>) -> Vec<u8> {
let mut section = Vec::new();
section.push(0x02); let len_placeholder = section.len();
section.extend_from_slice(&[0xB0, 0x00]);
section.extend_from_slice(&1u16.to_be_bytes()); section.push(0xC1); section.push(0x00); section.push(0x00); let pcr_pid = (0xE000u16 | (PID_VIDEO & 0x1FFF)).to_be_bytes();
section.extend_from_slice(&pcr_pid);
section.extend_from_slice(&[0xF0, 0x00]);
section.push(STREAM_TYPE_HEVC);
let v_pid = (0xE000u16 | (PID_VIDEO & 0x1FFF)).to_be_bytes();
section.extend_from_slice(&v_pid);
section.extend_from_slice(&[0xF0, 0x00]);
if let Some(codec) = audio {
section.push(codec.stream_type());
let a_pid = (0xE000u16 | (PID_AUDIO & 0x1FFF)).to_be_bytes();
section.extend_from_slice(&a_pid);
section.extend_from_slice(&[0xF0, 0x00]);
}
let section_len = section.len() - 3 + 4;
section[len_placeholder] = 0xB0 | ((section_len >> 8) as u8 & 0x0F);
section[len_placeholder + 1] = section_len as u8;
let crc = mpegts_crc32(§ion);
section.extend_from_slice(&crc.to_be_bytes());
let mut payload = Vec::with_capacity(section.len() + 1);
payload.push(0x00);
payload.extend_from_slice(§ion);
payload
}
fn build_pcr_adaptation(pcr_90k: u64) -> Vec<u8> {
let mut af = vec![0x50]; let pcr_base = pcr_90k & 0x1_FFFF_FFFF; let pcr_ext: u16 = 0; af.push((pcr_base >> 25) as u8);
af.push((pcr_base >> 17) as u8);
af.push((pcr_base >> 9) as u8);
af.push((pcr_base >> 1) as u8);
af.push(((pcr_base << 7) as u8 & 0x80) | 0x7E | ((pcr_ext >> 8) as u8 & 0x01));
af.push(pcr_ext as u8);
af
}
fn mpegts_crc32(data: &[u8]) -> u32 {
let mut crc: u32 = 0xFFFF_FFFF;
for &b in data {
crc ^= (b as u32) << 24;
for _ in 0..8 {
if crc & 0x8000_0000 != 0 {
crc = (crc << 1) ^ 0x04C1_1DB7;
} else {
crc <<= 1;
}
}
}
crc
}
#[cfg(test)]
mod tests {
use super::*;
fn assert_ts_well_formed(buf: &[u8]) {
assert_eq!(
buf.len() % 188,
0,
"stream not packet-aligned: {} bytes",
buf.len()
);
for (i, chunk) in buf.chunks(188).enumerate() {
assert_eq!(chunk[0], 0x47, "packet {} missing sync byte", i);
}
}
fn extract_pids(buf: &[u8]) -> Vec<u16> {
buf.chunks(188)
.map(|p| u16::from_be_bytes([p[1] & 0x1F, p[2]]))
.collect()
}
#[test]
fn crc32_is_self_validating() {
let a = [
0u8, 0xB0, 0x0D, 0x00, 0x01, 0xC1, 0x00, 0x00, 0x00, 0x01, 0xE1, 0x00,
];
let mut b = a;
b[5] ^= 0x01; let crc_a = mpegts_crc32(&a);
let crc_b = mpegts_crc32(&b);
assert_ne!(crc_a, crc_b);
assert_eq!(crc_a, mpegts_crc32(&a)); let crc_zero = mpegts_crc32(&[0u8; 12]);
assert_ne!(crc_zero, crc_a);
}
#[test]
fn video_only_mux_emits_pat_pmt_then_video() {
let mut sink: Vec<u8> = Vec::new();
let mut mux = M2tsMux::new(&mut sink);
let mut frame = Vec::new();
frame.extend_from_slice(&4u32.to_be_bytes());
frame.extend_from_slice(&[0x40, 0x01, 0x0C, 0x01]);
mux.write_video(0, &frame).unwrap();
mux.finish().unwrap();
drop(mux);
assert_ts_well_formed(&sink);
let pids = extract_pids(&sink);
assert_eq!(pids[0], PID_PAT);
assert_eq!(pids[1], PID_PMT);
assert!(pids.iter().any(|p| *p == PID_VIDEO));
}
#[test]
fn audio_track_appears_in_pmt_and_stream() {
let mut sink: Vec<u8> = Vec::new();
let mut mux = M2tsMux::new(&mut sink);
mux.set_audio(AudioCodec::Ac3);
let mut frame = Vec::new();
frame.extend_from_slice(&3u32.to_be_bytes());
frame.extend_from_slice(&[0x40, 0x01, 0x0C]);
mux.write_video(0, &frame).unwrap();
mux.write_audio(20_000_000, &[0x0B, 0x77, 0x12, 0x34])
.unwrap();
mux.finish().unwrap();
drop(mux);
assert_ts_well_formed(&sink);
let pids = extract_pids(&sink);
assert!(pids.iter().any(|p| *p == PID_VIDEO));
assert!(pids.iter().any(|p| *p == PID_AUDIO));
}
#[test]
fn psi_re_emits_at_interval() {
let mut sink: Vec<u8> = Vec::new();
let mut mux = M2tsMux::new(&mut sink);
let big: Vec<u8> = (0..(60 * 1024)).map(|i| (i & 0xff) as u8).collect();
let mut frame = Vec::new();
frame.extend_from_slice(&(big.len() as u32).to_be_bytes());
frame.extend_from_slice(&big);
mux.write_video(0, &frame).unwrap();
mux.finish().unwrap();
drop(mux);
assert_ts_well_formed(&sink);
let pids = extract_pids(&sink);
let pat_count = pids.iter().filter(|p| **p == PID_PAT).count();
let pmt_count = pids.iter().filter(|p| **p == PID_PMT).count();
assert!(pat_count >= 2, "expected ≥2 PAT, got {}", pat_count);
assert!(pmt_count >= 2, "expected ≥2 PMT, got {}", pmt_count);
}
#[test]
fn continuity_counter_increments_per_pid() {
let mut sink: Vec<u8> = Vec::new();
let mut mux = M2tsMux::new(&mut sink);
for pts in [0i64, 40_000_000, 80_000_000] {
let mut frame = Vec::new();
frame.extend_from_slice(&3u32.to_be_bytes());
frame.extend_from_slice(&[0xAA, 0xBB, 0xCC]);
mux.write_video(pts, &frame).unwrap();
}
mux.finish().unwrap();
drop(mux);
let ccs: Vec<u8> = sink
.chunks(188)
.filter(|p| u16::from_be_bytes([p[1] & 0x1F, p[2]]) == PID_VIDEO)
.map(|p| p[3] & 0x0F)
.collect();
for w in ccs.windows(2) {
assert_eq!(w[1], (w[0] + 1) & 0x0F);
}
}
}