use super::codec::CodecParser;
use super::demux_thread::{DemuxBatch, DemuxThread};
use super::ts::PesPacket;
use crate::disc::DiscTitle;
use crate::pes::{PesFrame, Stream};
use crossbeam_channel::Receiver;
use std::io;
pub struct PipelinedPesStream {
title: DiscTitle,
parsers: Vec<(u16, Box<dyn CodecParser>)>,
pid_to_track: Vec<(u16, usize)>,
demux_rx: Receiver<DemuxBatch>,
#[allow(dead_code)]
demux_thread: DemuxThread,
pending_frames: std::collections::VecDeque<PesFrame>,
eof: bool,
}
impl PipelinedPesStream {
pub fn new(
demux_thread: DemuxThread,
demux_rx: Receiver<DemuxBatch>,
title: DiscTitle,
parsers: Vec<(u16, Box<dyn CodecParser>)>,
pid_to_track: Vec<(u16, usize)>,
) -> Self {
Self {
title,
parsers,
pid_to_track,
demux_rx,
demux_thread,
pending_frames: std::collections::VecDeque::new(),
eof: false,
}
}
fn pump_one_batch(&mut self) -> io::Result<bool> {
match self.demux_rx.recv() {
Ok(DemuxBatch::Ts(packets)) => {
self.consume_ts(packets);
Ok(true)
}
Ok(DemuxBatch::Ps(packets)) => {
self.consume_ps(packets);
Ok(true)
}
Ok(DemuxBatch::Err(e)) => Err(e),
Err(_) => Ok(false),
}
}
fn consume_ts(&mut self, packets: Vec<PesPacket>) {
let skip_parse = std::env::var_os("FREEMKV_SKIP_PARSE").is_some();
for pes in packets {
if let Some((_, track)) = self
.pid_to_track
.iter()
.find(|(pid, _)| *pid == pes.pid)
.copied()
{
if skip_parse {
self.pending_frames.push_back(PesFrame {
track,
pts: pes.pts.map(super::codec::pts_to_ns).unwrap_or(0),
keyframe: false,
data: pes.data,
});
} else if let Some((_, parser)) =
self.parsers.iter_mut().find(|(pid, _)| *pid == pes.pid)
{
for frame in parser.parse(&pes) {
self.pending_frames
.push_back(PesFrame::from_codec_frame(track, frame));
}
}
}
}
}
fn consume_ps(&mut self, packets: Vec<super::ps::PsPacket>) {
for ps in packets {
let track = match ps.stream_id {
0xE0..=0xEF => 0,
0xC0..=0xDF => 1,
0xBD => ps
.sub_stream_id
.map(|s| (s & 0x1F) as usize + 1)
.unwrap_or(1),
_ => continue,
};
if track >= self.title.streams.len() {
continue;
}
let pid = self
.pid_to_track
.iter()
.find(|(_, idx)| *idx == track)
.map(|(p, _)| *p)
.unwrap_or(0);
let pes = PesPacket {
pid,
pts: ps.pts.map(|p| p as i64),
dts: ps.dts.map(|d| d as i64),
data: ps.data,
};
if let Some((_, parser)) = self.parsers.iter_mut().find(|(p, _)| *p == pid) {
for frame in parser.parse(&pes) {
self.pending_frames
.push_back(PesFrame::from_codec_frame(track, frame));
}
}
}
}
}
impl Stream for PipelinedPesStream {
fn read(&mut self) -> io::Result<Option<PesFrame>> {
if let Some(frame) = self.pending_frames.pop_front() {
return Ok(Some(frame));
}
if self.eof {
return Ok(None);
}
loop {
match self.pump_one_batch()? {
true => {
if let Some(frame) = self.pending_frames.pop_front() {
return Ok(Some(frame));
}
}
false => {
self.eof = true;
return Ok(self.pending_frames.pop_front());
}
}
}
}
fn write(&mut self, _: &PesFrame) -> io::Result<()> {
Err(crate::error::Error::StreamReadOnly.into())
}
fn finish(&mut self) -> io::Result<()> {
Ok(())
}
fn info(&self) -> &DiscTitle {
&self.title
}
fn headers_ready(&self) -> bool {
if std::env::var_os("FREEMKV_SKIP_PARSE").is_some() {
return true;
}
for (idx, s) in self.title.streams.iter().enumerate() {
if let crate::disc::Stream::Video(v) = s {
if !v.secondary && self.codec_private(idx).is_none() {
return false;
}
}
}
true
}
fn codec_private(&self, track: usize) -> Option<Vec<u8>> {
let pid = self
.pid_to_track
.iter()
.find(|(_, idx)| *idx == track)
.map(|(p, _)| *p)?;
self.parsers
.iter()
.find(|(p, _)| *p == pid)
.and_then(|(_, parser)| parser.codec_private())
}
}