use super::IOStream;
use crate::disc::{
detect_max_batch_sectors, Disc, DiscTitle, Extent, ScanOptions,
};
use crate::drive::Drive;
use crate::error::{Error, Result};
use crate::event::{Event, EventKind};
use std::io::{self, Read, Write};
use std::path::Path;
pub struct DiscStream {
drive: Drive,
title: DiscTitle,
decrypt_keys: crate::decrypt::DecryptKeys,
mode: ReadMode,
current_lba: u32,
current_extent: usize,
current_offset: u32,
read_buf: Vec<u8>,
buf_valid: usize,
buf_cursor: usize,
batch_sectors: u16,
pub errors: u64,
eof: bool,
ts_demuxer: Option<super::ts::TsDemuxer>,
ps_demuxer: Option<super::ps::PsDemuxer>,
parsers: Vec<(u16, Box<dyn super::codec::CodecParser>)>,
pending_frames: std::collections::VecDeque<crate::pes::PesFrame>,
pid_to_track: Vec<(u16, usize)>,
}
enum ReadMode {
Extents(Vec<Extent>),
Sequential { capacity: u32 },
}
pub struct DiscOpenResult {
pub stream: DiscStream,
pub disc: Disc,
}
impl DiscStream {
pub fn open(
device: Option<&Path>,
keydb_path: Option<&str>,
title_index: usize,
on_event: Option<&dyn Fn(Event)>,
) -> Result<DiscOpenResult> {
let emit = |kind: EventKind| {
if let Some(cb) = &on_event {
cb(Event { kind });
}
};
let mut drive = match device {
Some(d) => Drive::open(d)?,
None => crate::drive::find_drive().ok_or_else(|| Error::DeviceNotFound {
path: String::new(),
})?,
};
emit(EventKind::DriveOpened {
device: drive.device_path().to_string(),
});
let _ = drive.wait_ready();
emit(EventKind::DriveReady);
let init_ok = drive.init().is_ok();
emit(EventKind::InitComplete { success: init_ok });
let probe_ok = drive.probe_disc().is_ok();
emit(EventKind::ProbeComplete { success: probe_ok });
let scan_opts = match keydb_path {
Some(kp) => ScanOptions::with_keydb(kp),
None => ScanOptions::default(),
};
let disc = Disc::scan(&mut drive, &scan_opts)?;
emit(EventKind::ScanComplete {
titles: disc.titles.len(),
});
if title_index >= disc.titles.len() {
return Err(Error::DiscTitleRange {
index: title_index,
count: disc.titles.len(),
});
}
let title = disc.titles[title_index].clone();
let keys = disc.decrypt_keys();
let mut stream = Self::title(drive, title);
stream.decrypt_keys = keys;
if disc.content_format == crate::disc::ContentFormat::MpegPs {
stream.ts_demuxer = None;
stream.ps_demuxer = Some(super::ps::PsDemuxer::new());
}
Ok(DiscOpenResult { stream, disc })
}
pub fn title(drive: Drive, title: DiscTitle) -> Self {
let max_batch = detect_max_batch_sectors(drive.device_path());
let extents = title.extents.clone();
Self::new(drive, title, ReadMode::Extents(extents), max_batch)
}
pub fn full_disc(drive: Drive, title: DiscTitle, capacity: u32) -> Self {
let max_batch = detect_max_batch_sectors(drive.device_path());
Self::new(drive, title, ReadMode::Sequential { capacity }, max_batch)
}
pub fn full_disc_resume(drive: Drive, title: DiscTitle, capacity: u32, start_lba: u32) -> Self {
let max_batch = detect_max_batch_sectors(drive.device_path());
let mut stream = Self::new(drive, title, ReadMode::Sequential { capacity }, max_batch);
stream.current_lba = start_lba;
stream
}
fn new(drive: Drive, title: DiscTitle, mode: ReadMode, max_batch: u16) -> Self {
let mut pids = Vec::new();
let mut parsers = Vec::new();
let mut pid_to_track = Vec::new();
for (idx, s) in title.streams.iter().enumerate() {
let (pid, codec) = match s {
crate::disc::Stream::Video(v) => (v.pid, v.codec),
crate::disc::Stream::Audio(a) => (a.pid, a.codec),
crate::disc::Stream::Subtitle(s) => (s.pid, s.codec),
};
pids.push(pid);
pid_to_track.push((pid, idx));
parsers.push((pid, super::codec::parser_for_codec(codec)));
}
Self {
drive,
title,
decrypt_keys: crate::decrypt::DecryptKeys::None,
mode,
current_lba: 0,
current_extent: 0,
current_offset: 0,
read_buf: Vec::with_capacity(max_batch as usize * 2048),
buf_valid: 0,
buf_cursor: 0,
batch_sectors: max_batch,
errors: 0,
eof: false,
ts_demuxer: if pids.is_empty() { None } else { Some(super::ts::TsDemuxer::new(&pids)) },
ps_demuxer: None, parsers,
pending_frames: std::collections::VecDeque::new(),
pid_to_track,
}
}
pub fn set_raw(&mut self) {
self.decrypt_keys = crate::decrypt::DecryptKeys::None;
}
pub fn lock_tray(&mut self) {
self.drive.lock_tray();
}
pub fn unlock_tray(&mut self) {
self.drive.unlock_tray();
}
pub fn into_drive(self) -> Drive {
self.drive
}
fn fill(&mut self) -> bool {
match &self.mode {
ReadMode::Extents(_) => self.fill_extents(),
ReadMode::Sequential { .. } => self.fill_sequential(),
}
}
fn fill_extents(&mut self) -> bool {
let (ext_start, ext_sectors) = match &self.mode {
ReadMode::Extents(exts) => {
if self.current_extent >= exts.len() {
return false;
}
(
exts[self.current_extent].start_lba,
exts[self.current_extent].sector_count,
)
}
_ => unreachable!(),
};
let remaining = ext_sectors.saturating_sub(self.current_offset);
let sectors = remaining.min(self.batch_sectors as u32) as u16;
let sectors = sectors - (sectors % 3);
if sectors == 0 {
self.current_extent += 1;
self.current_offset = 0;
return self.fill_extents(); }
let lba = ext_start + self.current_offset;
let bytes = sectors as usize * 2048;
self.read_buf.resize(bytes, 0);
match self.drive.read(
lba,
sectors,
&mut self.read_buf[..bytes],
) {
Ok(_) => {
self.buf_valid = bytes;
self.buf_cursor = 0;
self.current_offset += sectors as u32;
if self.current_offset >= ext_sectors {
self.current_extent += 1;
self.current_offset = 0;
}
true
}
Err(_) => false, }
}
fn fill_sequential(&mut self) -> bool {
let capacity = match &self.mode {
ReadMode::Sequential { capacity } => *capacity,
_ => unreachable!(),
};
if self.current_lba >= capacity {
return false;
}
let remaining = capacity - self.current_lba;
let count = remaining.min(self.batch_sectors as u32) as u16;
let bytes = count as usize * 2048;
self.read_buf.resize(bytes, 0);
match self.drive.read(
self.current_lba,
count,
&mut self.read_buf[..bytes],
) {
Ok(_) => {
self.buf_valid = bytes;
self.buf_cursor = 0;
self.current_lba += count as u32;
true
}
Err(_) => false, }
}
}
impl IOStream for DiscStream {
fn info(&self) -> &DiscTitle {
&self.title
}
fn finish(&mut self) -> io::Result<()> {
self.drive.unlock_tray();
Ok(())
}
fn total_bytes(&self) -> Option<u64> {
match &self.mode {
ReadMode::Extents(extents) => {
Some(extents.iter().map(|e| e.sector_count as u64 * 2048).sum())
}
ReadMode::Sequential { capacity } => Some(*capacity as u64 * 2048),
}
}
fn keys(&self) -> crate::decrypt::DecryptKeys {
self.decrypt_keys.clone()
}
}
impl crate::pes::Stream for DiscStream {
fn read(&mut self) -> io::Result<Option<crate::pes::PesFrame>> {
if let Some(frame) = self.pending_frames.pop_front() {
return Ok(Some(frame));
}
if self.eof {
return Ok(None);
}
loop {
let got_data = match &self.mode {
ReadMode::Extents(_) => self.fill_extents(),
ReadMode::Sequential { .. } => self.fill_sequential(),
};
if !got_data {
self.eof = true;
return Ok(None);
}
let bytes = self.buf_valid;
if let Err(e) = crate::decrypt::decrypt_sectors(
&mut self.read_buf[..bytes],
&self.decrypt_keys,
0,
) {
return Err(io::Error::other(e.to_string()));
}
if let Some(ref mut demuxer) = self.ts_demuxer {
let packets = demuxer.feed(&self.read_buf[..bytes]);
for pes in &packets {
if let Some((_, track)) = self.pid_to_track.iter().find(|(pid, _)| *pid == pes.pid) {
if let Some((_, parser)) = self.parsers.iter_mut().find(|(pid, _)| *pid == pes.pid) {
for frame in parser.parse(pes) {
self.pending_frames.push_back(
crate::pes::PesFrame::from_codec_frame(*track, frame)
);
}
}
}
}
} else if let Some(ref mut demuxer) = self.ps_demuxer {
let packets = demuxer.feed(&self.read_buf[..bytes]);
for ps in &packets {
let track = match ps.stream_id {
0xE0..=0xEF => 0, 0xC0..=0xDF => 1, 0xBD => ps.sub_stream_id.map(|s| (s & 0x1F) as usize + 1).unwrap_or(1),
_ => continue,
};
if track < self.title.streams.len() {
let pts_ns = ps.pts.map(|p| (p as i64) * 1_000_000_000 / 90_000).unwrap_or(0);
self.pending_frames.push_back(crate::pes::PesFrame {
track,
pts: pts_ns,
keyframe: true, data: ps.data.clone(),
});
}
}
}
self.buf_valid = 0;
self.buf_cursor = 0;
if let Some(frame) = self.pending_frames.pop_front() {
return Ok(Some(frame));
}
}
}
fn write(&mut self, _frame: &crate::pes::PesFrame) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Unsupported, "disc is read-only"))
}
fn finish(&mut self) -> io::Result<()> {
self.drive.unlock_tray();
Ok(())
}
fn info(&self) -> &DiscTitle {
&self.title
}
fn codec_private(&self, track: usize) -> Option<Vec<u8>> {
let pid = self.pid_to_track.iter()
.find(|(_, idx)| *idx == track)
.map(|(pid, _)| *pid)?;
self.parsers.iter()
.find(|(p, _)| *p == pid)
.and_then(|(_, parser)| parser.codec_private())
}
fn headers_ready(&self) -> bool {
for (idx, s) in self.title.streams.iter().enumerate() {
if let crate::disc::Stream::Video(v) = s {
if !v.secondary && self.codec_private(idx).is_none() {
return false;
}
}
}
true
}
}
impl Read for DiscStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.buf_cursor < self.buf_valid {
let n = (self.buf_valid - self.buf_cursor).min(buf.len());
buf[..n].copy_from_slice(&self.read_buf[self.buf_cursor..self.buf_cursor + n]);
self.buf_cursor += n;
return Ok(n);
}
if self.eof {
return Ok(0);
}
if self.fill() {
let n = self.buf_valid.min(buf.len());
buf[..n].copy_from_slice(&self.read_buf[..n]);
self.buf_cursor = n;
Ok(n)
} else {
self.eof = true;
Ok(0)
}
}
}
impl Write for DiscStream {
fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"disc is read-only",
))
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}