use super::codec::{self, CodecParser};
use super::lookahead::{LookaheadBuffer, LookaheadState, DEFAULT_LOOKAHEAD_SIZE};
use super::mkv::{MkvMuxer, MkvTrack};
use super::ts::TsDemuxer;
use super::{ebml, IOStream, WriteSeek};
use std::io::Seek;
type MkvHeaderResult = io::Result<(crate::disc::DiscTitle, Vec<(u16, Vec<u8>)>)>;
fn skip_bytes(r: &mut impl Read, n: u64) -> io::Result<()> {
io::copy(&mut r.take(n), &mut io::sink())?;
Ok(())
}
use crate::disc::*;
use std::io::{self, Read, Write};
const DEFAULT_MAX_BUFFER: usize = DEFAULT_LOOKAHEAD_SIZE;
#[derive(Debug, Clone, Copy, PartialEq)]
enum WritePhase {
Scanning,
Streaming,
}
struct WriteState {
demuxer: TsDemuxer,
muxer: Option<MkvMuxer<Box<dyn WriteSeek>>>,
writer: Option<Box<dyn WriteSeek>>,
parsers: Vec<(u16, Box<dyn CodecParser>)>,
pid_to_track: Vec<(u16, usize)>,
tracks: Vec<MkvTrack>,
lookahead: LookaheadBuffer,
phase: WritePhase,
video_pending: usize,
}
struct ReadState {
reader: Box<dyn Read>,
buf: Vec<u8>,
pos: usize,
len: usize,
cluster_ts_ms: i64,
codec_privates: Vec<(u16, Vec<u8>)>,
initialized_tracks: Vec<u16>,
}
enum Mode {
Write(Box<WriteState>),
Read(ReadState),
}
pub struct MkvStream {
disc_title: DiscTitle,
mode: Mode,
max_buffer: usize,
finished: bool,
file_size: Option<u64>,
}
impl MkvStream {
pub fn new(writer: impl Write + Seek + 'static) -> Self {
Self {
disc_title: DiscTitle::empty(),
mode: Mode::Write(Box::new(WriteState {
demuxer: TsDemuxer::new(&[]),
muxer: None,
writer: Some(Box::new(writer)),
parsers: Vec::new(),
pid_to_track: Vec::new(),
tracks: Vec::new(),
lookahead: LookaheadBuffer::new(DEFAULT_MAX_BUFFER),
phase: WritePhase::Scanning,
video_pending: 0,
})),
max_buffer: DEFAULT_MAX_BUFFER,
finished: false,
file_size: None,
}
}
pub fn meta(mut self, dt: &DiscTitle) -> Self {
if let Mode::Write(ref mut ws) = self.mode {
let mut pids = Vec::new();
for s in &dt.streams {
let (pid, track, parser) = match s {
crate::disc::Stream::Video(v) => {
if !v.secondary {
ws.video_pending += 1;
}
(v.pid, MkvTrack::video(v), codec::parser_for_codec(v.codec))
}
crate::disc::Stream::Audio(a) => {
(a.pid, MkvTrack::audio(a), codec::parser_for_codec(a.codec))
}
crate::disc::Stream::Subtitle(s) => (
s.pid,
MkvTrack::subtitle(s),
codec::parser_for_codec_with_data(s.codec, s.codec_data.clone()),
),
};
let idx = ws.tracks.len();
pids.push(pid);
ws.pid_to_track.push((pid, idx));
ws.parsers.push((pid, parser));
ws.tracks.push(track);
}
ws.demuxer = TsDemuxer::new(&pids);
}
self.disc_title = dt.clone();
self
}
pub fn max_buffer(mut self, size: usize) -> Self {
self.max_buffer = size;
if let Mode::Write(ref mut ws) = self.mode {
ws.lookahead = LookaheadBuffer::new(size);
}
self
}
pub fn open(mut reader: impl Read + 'static) -> io::Result<Self> {
let (disc_title, codec_privates) = parse_mkv_header(&mut reader)?;
Ok(Self {
disc_title,
mode: Mode::Read(ReadState {
reader: Box::new(reader),
buf: Vec::new(),
pos: 0,
len: 0,
cluster_ts_ms: 0,
codec_privates,
initialized_tracks: Vec::new(),
}),
max_buffer: 0,
finished: false,
file_size: None,
})
}
}
impl crate::pes::Stream for MkvStream {
fn read(&mut self) -> io::Result<Option<crate::pes::PesFrame>> {
let rs = match self.mode {
Mode::Read(ref mut rs) => rs,
Mode::Write(_) => return Err(io::Error::new(io::ErrorKind::Unsupported, "write-only")),
};
loop {
let (id, size, _) = match ebml::read_element_header(&mut rs.reader) {
Ok(h) => h,
Err(_) => return Ok(None),
};
match id {
ebml::CLUSTER => continue,
ebml::CLUSTER_TIMESTAMP => {
rs.cluster_ts_ms = ebml::read_uint_val(&mut rs.reader, size as usize)? as i64;
continue;
}
ebml::SIMPLE_BLOCK => {
let block = ebml::read_binary_val(&mut rs.reader, size as usize)?;
if block.len() < 4 { continue; }
let (track, vl) = block_vint(&block);
if vl + 3 > block.len() { continue; }
let rel_ts = i16::from_be_bytes([block[vl], block[vl + 1]]);
let keyframe = block[vl + 2] & 0x80 != 0;
let data = block[vl + 3..].to_vec();
let pts_ms = rs.cluster_ts_ms + rel_ts as i64;
let track_idx = (track as usize).saturating_sub(1);
if track_idx >= self.disc_title.streams.len() {
continue;
}
return Ok(Some(crate::pes::PesFrame {
track: track_idx,
pts: pts_ms * 1_000_000, keyframe,
data,
}));
}
_ => {
skip_bytes(&mut rs.reader, size)?;
continue;
}
}
}
}
fn write(&mut self, _frame: &crate::pes::PesFrame) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Unsupported, "use MkvOutputStream for writing"))
}
fn finish(&mut self) -> io::Result<()> { Ok(()) }
fn info(&self) -> &crate::disc::DiscTitle { &self.disc_title }
fn codec_private(&self, track: usize) -> Option<Vec<u8>> {
let track_num = (track + 1) as u16; if let Mode::Read(ref rs) = self.mode {
rs.codec_privates
.iter()
.find(|(tn, _)| *tn == track_num)
.map(|(_, data)| data.clone())
} else {
None
}
}
fn headers_ready(&self) -> bool {
true }
}
impl IOStream for MkvStream {
fn info(&self) -> &DiscTitle {
&self.disc_title
}
fn finish(&mut self) -> io::Result<()> {
if self.finished {
return Ok(());
}
self.finished = true;
if let Mode::Write(ref mut ws) = self.mode {
if let Some(ref mut muxer) = ws.muxer {
for pes in &ws.demuxer.flush() {
write_pes(&ws.pid_to_track, &mut ws.parsers, muxer, pes)?;
}
}
if let Some(muxer) = ws.muxer.take() {
muxer.finish()?;
}
}
Ok(())
}
fn total_bytes(&self) -> Option<u64> {
self.file_size
}
}
impl Write for MkvStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let dt = &self.disc_title;
let ws = match self.mode {
Mode::Write(ref mut ws) => ws,
Mode::Read(_) => {
return Err(io::Error::new(
io::ErrorKind::Unsupported,
"stream opened for reading",
))
}
};
match ws.phase {
WritePhase::Scanning => {
let packets = ws.demuxer.feed(buf);
for pes in &packets {
if let Some((_, p)) = ws.parsers.iter_mut().find(|(pid, _)| *pid == pes.pid) {
let _ = p.parse(pes);
}
}
let state = ws.lookahead.push(buf);
if check_codec_private(ws) {
ws.lookahead.mark_ready();
begin_streaming(ws, dt)?;
return Ok(buf.len());
}
match state {
LookaheadState::Collecting | LookaheadState::Ready => Ok(buf.len()),
LookaheadState::Overflow => Err(io::Error::new(
io::ErrorKind::OutOfMemory,
"no codec headers found within lookahead buffer",
)),
}
}
WritePhase::Streaming => {
let packets = ws.demuxer.feed(buf);
if let Some(ref mut muxer) = ws.muxer {
for pes in &packets {
write_pes(&ws.pid_to_track, &mut ws.parsers, muxer, pes)?;
}
}
Ok(buf.len())
}
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Read for MkvStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let rs = match self.mode {
Mode::Read(ref mut rs) => rs,
Mode::Write(_) => {
return Err(io::Error::new(
io::ErrorKind::Unsupported,
"stream opened for writing",
))
}
};
if rs.pos < rs.len {
let n = (rs.len - rs.pos).min(buf.len());
buf[..n].copy_from_slice(&rs.buf[rs.pos..rs.pos + n]);
rs.pos += n;
return Ok(n);
}
loop {
let (id, size, _) = match ebml::read_element_header(&mut rs.reader) {
Ok(h) => h,
Err(_) => return Ok(0),
};
match id {
ebml::CLUSTER => continue,
ebml::CLUSTER_TIMESTAMP => {
rs.cluster_ts_ms = ebml::read_uint_val(&mut rs.reader, size as usize)? as i64;
continue;
}
ebml::SIMPLE_BLOCK => {
let block = ebml::read_binary_val(&mut rs.reader, size as usize)?;
if block.len() < 4 {
continue;
}
let (track, vl) = block_vint(&block);
if vl + 3 > block.len() {
continue;
}
let rel_ts = i16::from_be_bytes([block[vl], block[vl + 1]]);
let frame = &block[vl + 3..];
let pts_ms = rs.cluster_ts_ms + rel_ts as i64;
let tnum = track as u16;
rs.buf.clear();
if !rs.initialized_tracks.contains(&tnum) {
rs.initialized_tracks.push(tnum);
if let Some((_, cp)) = rs.codec_privates.iter().find(|(t, _)| *t == tnum) {
let annex_b = hvcc_to_annex_b(cp);
if !annex_b.is_empty() {
frame_to_ts(&mut rs.buf, tnum, pts_ms, &annex_b);
}
}
}
frame_to_ts(&mut rs.buf, tnum, pts_ms, frame);
rs.pos = 0;
rs.len = rs.buf.len();
if rs.len > 0 {
let n = rs.len.min(buf.len());
buf[..n].copy_from_slice(&rs.buf[..n]);
rs.pos = n;
return Ok(n);
}
}
_ => {
if size != u64::MAX && size > 0 {
skip_bytes(&mut rs.reader, size)?;
}
}
}
}
}
}
fn check_codec_private(ws: &mut WriteState) -> bool {
if ws.video_pending == 0 {
return true;
}
for (pid, parser) in &ws.parsers {
if let Some(cp) = parser.codec_private() {
if let Some((_, idx)) = ws.pid_to_track.iter().find(|(p, _)| p == pid) {
if ws.tracks[*idx].codec_private.is_none() {
ws.tracks[*idx].codec_private = Some(cp);
ws.video_pending -= 1;
}
}
}
}
ws.video_pending == 0
}
fn begin_streaming(ws: &mut WriteState, dt: &DiscTitle) -> io::Result<()> {
let writer = ws
.writer
.take()
.ok_or_else(|| io::Error::other("writer already consumed"))?;
ws.muxer = Some(MkvMuxer::new_with_chapters(
writer,
&ws.tracks,
Some(&dt.playlist),
dt.duration_secs,
&dt.chapters,
)?);
ws.phase = WritePhase::Streaming;
let pids: Vec<u16> = ws.pid_to_track.iter().map(|(pid, _)| *pid).collect();
let buffered = ws.lookahead.drain();
if !buffered.is_empty() {
let mut temp = TsDemuxer::new(&pids);
let packets = temp.feed(&buffered);
if let Some(ref mut muxer) = ws.muxer {
for pes in &packets {
write_pes(&ws.pid_to_track, &mut ws.parsers, muxer, pes)?;
}
}
}
let remainder = ws.demuxer.take_remainder();
ws.demuxer = TsDemuxer::new(&pids);
ws.demuxer.set_remainder(remainder);
Ok(())
}
fn write_pes(
pid_to_track: &[(u16, usize)],
parsers: &mut [(u16, Box<dyn CodecParser>)],
muxer: &mut MkvMuxer<Box<dyn WriteSeek>>,
pes: &super::ts::PesPacket,
) -> io::Result<()> {
let idx = match pid_to_track.iter().find(|(pid, _)| *pid == pes.pid) {
Some((_, idx)) => *idx,
None => return Ok(()),
};
let parser = match parsers.iter_mut().find(|(pid, _)| *pid == pes.pid) {
Some((_, p)) => p,
None => return Ok(()),
};
for frame in parser.parse(pes) {
muxer.write_frame(idx, frame.pts_ns, frame.keyframe, &frame.data)?;
}
Ok(())
}
fn parse_mkv_header(
r: &mut impl Read,
) -> MkvHeaderResult {
let mut title = String::new();
let mut duration_ms = 0.0f64;
let mut ts_scale: u64 = 1_000_000;
let mut streams: Vec<crate::disc::Stream> = Vec::new();
let mut codec_privates: Vec<(u16, Vec<u8>)> = Vec::new();
let (id, size, _) = ebml::read_element_header(r)?;
if id != ebml::EBML {
return Err(io::Error::new(io::ErrorKind::InvalidData, "not EBML"));
}
if size > i64::MAX as u64 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"EBML header too large",
));
}
skip_bytes(r, size)?;
let (id, _, _) = ebml::read_element_header(r)?;
if id != ebml::SEGMENT {
return Err(io::Error::new(io::ErrorKind::InvalidData, "no Segment"));
}
let (mut got_info, mut got_tracks) = (false, false);
loop {
if got_info && got_tracks {
break;
}
let (id, size, _) = match ebml::read_element_header(r) {
Ok(h) => h,
Err(_) => break,
};
match id {
ebml::INFO => {
let mut remaining = size;
while remaining > 0 {
let (cid, cs, hlen) = ebml::read_element_header(r)?;
remaining = remaining.saturating_sub(hlen as u64 + cs);
match cid {
ebml::TIMESTAMP_SCALE => ts_scale = ebml::read_uint_val(r, cs as usize)?,
ebml::DURATION => duration_ms = ebml::read_float_val(r, cs as usize)?,
ebml::TITLE => title = ebml::read_string_val(r, cs as usize)?,
_ => { skip_bytes(r, cs)?; }
}
}
got_info = true;
}
ebml::TRACKS => {
let mut remaining = size;
while remaining > 0 {
let (cid, cs, hlen) = ebml::read_element_header(r)?;
remaining = remaining.saturating_sub(hlen as u64 + cs);
if cid == ebml::TRACK_ENTRY {
let (stream, tnum, cp) = parse_track(r, cs)?;
if let Some(s) = stream {
streams.push(s);
}
if let Some(cp) = cp {
codec_privates.push((tnum, cp));
}
} else {
skip_bytes(r, cs)?;
}
}
got_tracks = true;
}
ebml::CLUSTER => break,
_ if size != u64::MAX => {
skip_bytes(r, size)?;
}
_ => break,
}
}
let disc_title = DiscTitle {
playlist: title,
duration_secs: duration_ms * (ts_scale as f64) / 1_000_000_000.0,
streams,
..DiscTitle::empty()
};
Ok((disc_title, codec_privates))
}
fn parse_track(
r: &mut impl Read,
size: u64,
) -> io::Result<(Option<crate::disc::Stream>, u16, Option<Vec<u8>>)> {
let (mut ttype, mut tnum) = (0u64, 0u16);
let (mut codec_id, mut lang, mut name) = (String::new(), String::from("und"), String::new());
let (mut ph, mut sr, mut ch, mut forced) = (0u32, 0.0f64, 0u8, false);
let mut codec_priv: Option<Vec<u8>> = None;
let mut remaining = size;
while remaining > 0 {
let (cid, cs, hlen) = ebml::read_element_header(r)?;
remaining = remaining.saturating_sub(hlen as u64 + cs);
match cid {
ebml::TRACK_NUMBER => tnum = ebml::read_uint_val(r, cs as usize)? as u16,
ebml::TRACK_TYPE => ttype = ebml::read_uint_val(r, cs as usize)?,
ebml::CODEC_ID => codec_id = ebml::read_string_val(r, cs as usize)?,
ebml::CODEC_PRIVATE => codec_priv = Some(ebml::read_binary_val(r, cs as usize)?),
ebml::LANGUAGE => lang = ebml::read_string_val(r, cs as usize)?,
ebml::TRACK_NAME => name = ebml::read_string_val(r, cs as usize)?,
ebml::FLAG_FORCED => forced = ebml::read_uint_val(r, cs as usize)? != 0,
ebml::VIDEO => {
let mut vrem = cs;
while vrem > 0 {
let (vid, vs, vhlen) = ebml::read_element_header(r)?;
vrem = vrem.saturating_sub(vhlen as u64 + vs);
if vid == ebml::PIXEL_HEIGHT {
ph = ebml::read_uint_val(r, vs as usize)? as u32;
} else {
skip_bytes(r, vs)?;
}
}
}
ebml::AUDIO => {
let mut arem = cs;
while arem > 0 {
let (aid, as_, ahlen) = ebml::read_element_header(r)?;
arem = arem.saturating_sub(ahlen as u64 + as_);
match aid {
ebml::SAMPLING_FREQUENCY => sr = ebml::read_float_val(r, as_ as usize)?,
ebml::CHANNELS => ch = ebml::read_uint_val(r, as_ as usize)? as u8,
_ => { skip_bytes(r, as_)?; }
}
}
}
_ => { skip_bytes(r, cs)?; }
}
}
let codec = match codec_id.as_str() {
"V_MPEGH/ISO/HEVC" => Codec::Hevc,
"V_MPEG4/ISO/AVC" => Codec::H264,
"V_MS/VFW/FOURCC" => Codec::Vc1,
"V_MPEG2" => Codec::Mpeg2,
"A_AC3" => Codec::Ac3,
"A_EAC3" => Codec::Ac3Plus,
"A_TRUEHD" => Codec::TrueHd,
"A_DTS" => Codec::Dts,
"A_PCM/INT/BIG" => Codec::Lpcm,
"S_HDMV/PGS" => Codec::Pgs,
"S_VOBSUB" => Codec::DvdSub,
_ => Codec::Unknown(0),
};
let res = Resolution::from_height(ph);
let chs = AudioChannels::from_count(ch);
let srs = if sr >= 96000.0 {
SampleRate::S96
} else {
SampleRate::S48
};
let ts_pid = if tnum == 1 { 0x1011 } else { 0x1100 + (tnum - 2) };
let stream = match ttype {
1 => {
let is_secondary = name.contains("Dolby Vision EL") || name.contains("DV EL");
Some(crate::disc::Stream::Video(VideoStream {
pid: ts_pid,
codec,
resolution: res,
frame_rate: FrameRate::Unknown,
hdr: HdrFormat::Sdr,
color_space: ColorSpace::Bt709,
secondary: is_secondary,
label: name,
}))
}
2 => Some(crate::disc::Stream::Audio(AudioStream {
pid: ts_pid,
codec,
channels: chs,
language: lang,
sample_rate: srs,
secondary: false,
label: name,
})),
17 => Some(crate::disc::Stream::Subtitle(SubtitleStream {
pid: ts_pid,
codec,
language: lang,
forced,
codec_data: None,
})),
_ => None,
};
Ok((stream, tnum, codec_priv))
}
fn block_vint(d: &[u8]) -> (u64, usize) {
if d.is_empty() {
return (0, 0);
}
if d[0] & 0x80 != 0 {
return ((d[0] & 0x7F) as u64, 1);
}
if d[0] & 0x40 != 0 && d.len() >= 2 {
return ((((d[0] & 0x3F) as u64) << 8) | d[1] as u64, 2);
}
if d[0] & 0x20 != 0 && d.len() >= 3 {
return ((((d[0] & 0x1F) as u64) << 16) | ((d[1] as u64) << 8) | d[2] as u64, 3);
}
if d[0] & 0x10 != 0 && d.len() >= 4 {
return ((((d[0] & 0x0F) as u64) << 24) | ((d[1] as u64) << 16) | ((d[2] as u64) << 8) | d[3] as u64, 4);
}
(0, 1) }
fn hvcc_to_annex_b(hvcc: &[u8]) -> Vec<u8> {
if hvcc.len() < 23 {
return Vec::new();
}
let num_arrays = hvcc[22] as usize;
let mut pos = 23;
let mut out = Vec::new();
for _ in 0..num_arrays {
if pos + 3 > hvcc.len() {
break;
}
pos += 1; let num_nalus = u16::from_be_bytes([hvcc[pos], hvcc[pos + 1]]) as usize;
pos += 2;
for _ in 0..num_nalus {
if pos + 2 > hvcc.len() {
break;
}
let nal_len = u16::from_be_bytes([hvcc[pos], hvcc[pos + 1]]) as usize;
pos += 2;
if pos + nal_len > hvcc.len() {
break;
}
out.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]);
out.extend_from_slice(&hvcc[pos..pos + nal_len]);
pos += nal_len;
}
}
out
}
fn frame_to_ts(out: &mut Vec<u8>, track: u16, pts_ms: i64, data: &[u8]) {
let pid = if track == 1 {
0x1011
} else {
0x1100 + (track - 2)
};
let is_video = track <= 1 || pid == 0x1011;
let stream_id: u8 = if is_video { 0xE0 } else { 0xBD };
let pts = encode_pts(pts_ms * 90);
let hdr = [0x00, 0x00, 0x01, stream_id, 0x00, 0x00, 0x80, 0x80, 0x05];
let mut pes = Vec::with_capacity(hdr.len() + pts.len() + data.len());
pes.extend_from_slice(&hdr);
pes.extend_from_slice(&pts);
if is_video && data.len() > 4 {
let mut pos = 0;
while pos + 4 <= data.len() {
let nal_len = u32::from_be_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
pos += 4;
if nal_len == 0 || pos + nal_len > data.len() {
break;
}
pes.extend_from_slice(&[0x00, 0x00, 0x00, 0x01]);
pes.extend_from_slice(&data[pos..pos + nal_len]);
pos += nal_len;
}
} else {
pes.extend_from_slice(data);
}
let mut off = 0;
let mut pusi = true;
while off < pes.len() {
let mut pkt = [0u8; 192];
pkt[4] = 0x47;
pkt[5] = (pid >> 8) as u8 & 0x1F;
if pusi {
pkt[5] |= 0x40;
pusi = false;
}
pkt[6] = pid as u8;
let space = 184;
let rem = pes.len() - off;
let n = rem.min(space);
if n < space {
let pad = space - n;
pkt[7] = 0x30; pkt[8] = pad as u8;
if pad > 1 {
pkt[9] = 0x00;
}
for byte in pkt.iter_mut().take((8 + pad).min(192)).skip(10) {
*byte = 0xFF;
}
pkt[8 + pad..8 + pad + n].copy_from_slice(&pes[off..off + n]);
} else {
pkt[7] = 0x10; pkt[8..8 + n].copy_from_slice(&pes[off..off + n]);
}
out.extend_from_slice(&pkt);
off += n;
}
}
fn encode_pts(pts: i64) -> [u8; 5] {
let p = pts as u64;
[
0x21 | ((p >> 29) & 0x0E) as u8,
((p >> 22) & 0xFF) as u8,
0x01 | ((p >> 14) & 0xFE) as u8,
((p >> 7) & 0xFF) as u8,
0x01 | ((p << 1) & 0xFE) as u8,
]
}