use crate::*;
use anyhow::{bail, Result};
use core::mem::size_of;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::path::Path;
use std::sync::{Arc, OnceLock};
use sync_file::{RandomAccessFile, ReadAt};
use tracing::{debug, debug_span, info, info_span, trace, trace_span};
use zerocopy::IntoBytes;
pub struct Msfz<F = RandomAccessFile> {
file: F,
fragments: Vec<Fragment>,
stream_fragments: Vec<u32>,
chunk_table: Box<[ChunkEntry]>,
chunk_cache: Vec<OnceLock<Arc<[u8]>>>,
}
#[derive(Clone)]
struct Fragment {
size: u32,
location: FragmentLocation,
}
impl std::fmt::Debug for Fragment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "size 0x{:05x} at {:?}", self.size, self.location)
}
}
impl std::fmt::Debug for FragmentLocation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.is_nil() {
f.write_str("nil")
} else if self.is_compressed() {
write!(
f,
"uncompressed at 0x{:06x}",
self.uncompressed_file_offset()
)
} else {
write!(
f,
"chunk {} : 0x{:04x}",
self.compressed_first_chunk(),
self.compressed_offset_within_chunk()
)
}
}
}
const FRAGMENT_LOCATION_32BIT_IS_COMPRESSED_MASK: u32 = 1u32 << 31;
#[derive(Copy, Clone)]
struct FragmentLocation {
lo: u32,
hi: u32,
}
impl FragmentLocation {
const NIL: Self = Self {
lo: u32::MAX,
hi: u32::MAX,
};
fn is_nil(&self) -> bool {
self.lo == u32::MAX && self.hi == u32::MAX
}
fn is_compressed(&self) -> bool {
(self.hi & FRAGMENT_LOCATION_32BIT_IS_COMPRESSED_MASK) != 0
}
fn compressed_first_chunk(&self) -> u32 {
debug_assert!(!self.is_nil());
debug_assert!(self.is_compressed());
self.hi & !FRAGMENT_LOCATION_32BIT_IS_COMPRESSED_MASK
}
fn compressed_offset_within_chunk(&self) -> u32 {
debug_assert!(!self.is_nil());
debug_assert!(self.is_compressed());
self.lo
}
fn uncompressed_file_offset(&self) -> u64 {
debug_assert!(!self.is_nil());
debug_assert!(!self.is_compressed());
((self.hi as u64) << 32) | (self.lo as u64)
}
}
impl Msfz<RandomAccessFile> {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let f = File::open(path)?;
let raf = RandomAccessFile::from(f);
Self::from_file(raf)
}
}
impl<F: ReadAt> Msfz<F> {
pub fn from_file(file: F) -> Result<Self> {
let _span = info_span!("Msfz::from_file").entered();
let mut header: MsfzFileHeader = MsfzFileHeader::new_zeroed();
file.read_exact_at(header.as_mut_bytes(), 0)?;
if header.signature != MSFZ_FILE_SIGNATURE {
bail!("This file does not have a PDZ file signature.");
}
if header.version.get() != MSFZ_FILE_VERSION_V0 {
bail!("This PDZ file uses a version number that is not supported.");
}
let num_streams = header.num_streams.get();
if num_streams == 0 {
bail!("The stream directory is invalid; it is empty.");
}
let stream_dir_size_uncompressed = header.stream_dir_size_uncompressed.get() as usize;
let stream_dir_size_compressed = header.stream_dir_size_compressed.get() as usize;
let stream_dir_file_offset = header.stream_dir_offset.get();
let stream_dir_compression = header.stream_dir_compression.get();
info!(
num_streams,
stream_dir_size_uncompressed,
stream_dir_size_compressed,
stream_dir_compression,
stream_dir_file_offset,
"reading stream directory"
);
let mut stream_dir_bytes: Vec<u8> =
map_alloc_error(FromZeros::new_vec_zeroed(stream_dir_size_uncompressed))?;
if let Some(compression) = Compression::try_from_code_opt(stream_dir_compression)? {
let mut compressed_stream_dir: Vec<u8> =
map_alloc_error(FromZeros::new_vec_zeroed(stream_dir_size_compressed))?;
file.read_exact_at(
compressed_stream_dir.as_mut_bytes(),
header.stream_dir_offset.get(),
)?;
debug!("decompressing stream directory");
crate::compress_utils::decompress_to_slice(
compression,
&compressed_stream_dir,
&mut stream_dir_bytes,
)?;
} else {
file.read_exact_at(stream_dir_bytes.as_mut_bytes(), stream_dir_file_offset)?;
}
let num_chunks = header.num_chunks.get() as usize;
let chunk_index_size = header.chunk_table_size.get() as usize;
if chunk_index_size != num_chunks * size_of::<ChunkEntry>() {
bail!("This PDZ file is invalid. num_chunks and chunk_index_size are not consistent.");
}
let chunk_table_offset = header.chunk_table_offset.get();
let mut chunk_table: Box<[ChunkEntry]> =
map_alloc_error(FromZeros::new_box_zeroed_with_elems(num_chunks))?;
if num_chunks != 0 {
info!(
num_chunks,
chunk_table_offset, "reading compressed chunk table"
);
file.read_exact_at(chunk_table.as_mut_bytes(), chunk_table_offset)?;
} else {
}
let mut chunk_cache = Vec::with_capacity(num_chunks);
chunk_cache.resize_with(num_chunks, Default::default);
let stream_dir = decode_stream_dir(&stream_dir_bytes, num_streams, &chunk_table)?;
Ok(Self {
file,
fragments: stream_dir.fragments,
stream_fragments: stream_dir.stream_fragments,
chunk_table,
chunk_cache,
})
}
pub fn num_streams(&self) -> u32 {
(self.stream_fragments.len() - 1) as u32
}
fn stream_fragments_result(&self, stream: u32) -> Result<&[Fragment]> {
self.stream_fragments(stream)
.ok_or_else(|| anyhow::anyhow!("Stream index is out of range"))
}
fn stream_fragments(&self, stream: u32) -> Option<&[Fragment]> {
let i = stream as usize;
if i < self.stream_fragments.len() - 1 {
let start = self.stream_fragments[i] as usize;
let end = self.stream_fragments[i + 1] as usize;
let fragments = &self.fragments[start..end];
match fragments {
[f, ..] if f.location.is_nil() => Some(&[]),
_ => Some(fragments),
}
} else {
None
}
}
pub fn stream_size(&self, stream: u32) -> Result<u64> {
let fragments = self.stream_fragments_result(stream)?;
Ok(fragments.iter().map(|f| f.size as u64).sum())
}
pub fn is_stream_valid(&self, stream: u32) -> bool {
assert!(!self.stream_fragments.is_empty());
if stream == 0 {
return false;
}
let i = stream as usize;
if i < self.stream_fragments.len() - 1 {
let start = self.stream_fragments[i] as usize;
let end = self.stream_fragments[i + 1] as usize;
let fragments = &self.fragments[start..end];
match fragments {
[f, ..] if f.location.is_nil() => false,
_ => true,
}
} else {
false
}
}
fn get_chunk_slice(&self, chunk: u32, offset: u32, size: u32) -> std::io::Result<&[u8]> {
let chunk_data = self.get_chunk_data(chunk)?;
if let Some(slice) = chunk_data.get(offset as usize..offset as usize + size as usize) {
Ok(slice)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"PDZ file contains invalid byte ranges within a chunk",
))
}
}
fn get_chunk_data(&self, chunk_index: u32) -> std::io::Result<&Arc<[u8]>> {
let _span = trace_span!("get_chunk_data").entered();
trace!(chunk_index);
debug_assert_eq!(self.chunk_cache.len(), self.chunk_table.len());
let Some(slot) = self.chunk_cache.get(chunk_index as usize) else {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Chunk index is out of range.",
));
};
if let Some(arc) = slot.get() {
trace!(chunk_index, "found chunk in cache");
return Ok(arc);
}
let arc = self.load_chunk_data(chunk_index)?;
Ok(slot.get_or_init(move || arc))
}
#[inline(never)]
fn load_chunk_data(&self, chunk_index: u32) -> std::io::Result<Arc<[u8]>> {
assert_eq!(self.chunk_cache.len(), self.chunk_table.len());
let _span = debug_span!("load_chunk_data").entered();
let entry = &self.chunk_table[chunk_index as usize];
let compression_opt =
Compression::try_from_code_opt(entry.compression.get()).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Chunk uses an unrecognized compression algorithm",
)
})?;
let mut compressed_data: Box<[u8]> =
FromZeros::new_box_zeroed_with_elems(entry.compressed_size.get() as usize)
.map_err(|_| std::io::Error::from(std::io::ErrorKind::OutOfMemory))?;
self.file
.read_exact_at(&mut compressed_data, entry.file_offset.get())?;
let uncompressed_data: Box<[u8]> = if let Some(compression) = compression_opt {
let mut uncompressed_data: Box<[u8]> =
FromZeros::new_box_zeroed_with_elems(entry.uncompressed_size.get() as usize)
.map_err(|_| std::io::Error::from(std::io::ErrorKind::OutOfMemory))?;
self::compress_utils::decompress_to_slice(
compression,
&compressed_data,
&mut uncompressed_data,
)?;
uncompressed_data
} else {
compressed_data
};
Ok(Arc::from(uncompressed_data))
}
pub fn read_stream(&self, stream: u32) -> anyhow::Result<StreamData> {
let _span = trace_span!("read_stream_to_cow").entered();
trace!(stream);
let mut fragments = self.stream_fragments_result(stream)?;
match fragments.first() {
Some(f) if f.location.is_nil() => fragments = &[],
_ => {}
}
if fragments.is_empty() {
return Ok(StreamData::empty());
}
if fragments.len() == 1 && fragments[0].location.is_compressed() {
let chunk_index = fragments[0].location.compressed_first_chunk();
let offset_within_chunk = fragments[0].location.compressed_offset_within_chunk();
let chunk_data = self.get_chunk_data(chunk_index)?;
let fragment_range = offset_within_chunk as usize
..offset_within_chunk as usize + fragments[0].size as usize;
if chunk_data.get(fragment_range.clone()).is_none() {
bail!("PDZ data is invalid. Stream fragment byte range is out of range.");
}
return Ok(StreamData::ArcSlice(Arc::clone(chunk_data), fragment_range));
}
let stream_size: u32 = fragments.iter().map(|f| f.size).sum();
let stream_usize = stream_size as usize;
let mut output_buffer: Box<[u8]> = FromZeros::new_box_zeroed_with_elems(stream_usize)
.map_err(|_| std::io::Error::from(std::io::ErrorKind::OutOfMemory))?;
let mut output_slice: &mut [u8] = &mut output_buffer;
for fragment in fragments.iter() {
let stream_offset = stream_usize - output_slice.len();
let (fragment_output_slice, rest) = output_slice.split_at_mut(fragment.size as usize);
output_slice = rest;
if fragment.location.is_compressed() {
let chunk_index = fragment.location.compressed_first_chunk();
let offset_within_chunk = fragment.location.compressed_offset_within_chunk();
let chunk_data = self.get_chunk_data(chunk_index)?;
if let Some(chunk_slice) = chunk_data.get(
offset_within_chunk as usize
..offset_within_chunk as usize + fragment.size as usize,
) {
fragment_output_slice.copy_from_slice(chunk_slice);
} else {
bail!("PDZ data is invalid. Stream fragment byte range is out of range.");
}
} else {
let file_offset = fragment.location.uncompressed_file_offset();
trace!(
file_offset,
stream_offset,
fragment_len = fragment_output_slice.len(),
"reading uncompressed fragment"
);
self.file
.read_exact_at(fragment_output_slice, file_offset)?;
}
}
assert!(output_slice.is_empty());
Ok(StreamData::Box(output_buffer))
}
pub fn get_stream_reader(&self, stream: u32) -> Result<StreamReader<'_, F>> {
let fragments = self.stream_fragments_result(stream)?;
Ok(StreamReader {
msfz: self,
size: fragments.iter().map(|f| f.size).sum(),
fragments,
pos: 0,
})
}
}
pub struct StreamReader<'a, F> {
msfz: &'a Msfz<F>,
size: u32,
fragments: &'a [Fragment],
pos: u64,
}
impl<'a, F> StreamReader<'a, F> {
pub fn is_empty(&self) -> bool {
self.stream_size() == 0
}
pub fn stream_size(&self) -> u32 {
self.size
}
}
impl<'a, F: ReadAt> ReadAt for StreamReader<'a, F> {
fn read_at(&self, mut buf: &mut [u8], offset: u64) -> std::io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let original_buf_len = buf.len();
let mut current_offset: u64 = offset;
for fragment in self.fragments.iter() {
debug_assert!(!buf.is_empty());
if current_offset >= fragment.size as u64 {
current_offset -= fragment.size as u64;
continue;
}
let fragment_bytes_available = fragment.size - current_offset as u32;
let num_bytes_xfer = buf.len().min(fragment_bytes_available as usize);
let (buf_xfer, buf_rest) = buf.split_at_mut(num_bytes_xfer);
buf = buf_rest;
if fragment.location.is_compressed() {
let chunk_index = fragment.location.compressed_first_chunk();
let offset_within_chunk = fragment.location.compressed_offset_within_chunk();
let chunk_slice = self.msfz.get_chunk_slice(
chunk_index,
offset_within_chunk + current_offset as u32,
num_bytes_xfer as u32,
)?;
buf_xfer.copy_from_slice(chunk_slice);
} else {
let file_offset = fragment.location.uncompressed_file_offset();
self.msfz
.file
.read_exact_at(buf_xfer, file_offset + current_offset)?;
}
if buf.is_empty() {
break;
}
if current_offset >= num_bytes_xfer as u64 {
current_offset -= num_bytes_xfer as u64;
} else {
current_offset = 0;
}
}
Ok(original_buf_len - buf.len())
}
}
impl<'a, F: ReadAt> Read for StreamReader<'a, F> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let n = self.read_at(buf, self.pos)?;
self.pos += n as u64;
Ok(n)
}
}
impl<'a, F> Seek for StreamReader<'a, F> {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
match pos {
SeekFrom::Start(p) => self.pos = p,
SeekFrom::End(offset) => {
let new_pos = self.stream_size() as i64 + offset;
if new_pos < 0 {
return Err(std::io::ErrorKind::InvalidInput.into());
}
self.pos = new_pos as u64;
}
SeekFrom::Current(offset) => {
let new_pos = self.pos as i64 + offset;
if new_pos < 0 {
return Err(std::io::ErrorKind::InvalidInput.into());
}
self.pos = new_pos as u64;
}
}
Ok(self.pos)
}
}
struct DecodedStreamDir {
fragments: Vec<Fragment>,
stream_fragments: Vec<u32>,
}
fn decode_stream_dir(
stream_dir_bytes: &[u8],
num_streams: u32,
chunk_table: &[ChunkEntry],
) -> anyhow::Result<DecodedStreamDir> {
let mut dec = Decoder {
bytes: stream_dir_bytes,
};
let mut fragments: Vec<Fragment> = Vec::new();
let mut stream_fragments: Vec<u32> = Vec::with_capacity(num_streams as usize + 1);
for _ in 0..num_streams {
stream_fragments.push(fragments.len() as u32);
let mut fragment_size = dec.u32()?;
if fragment_size == NIL_STREAM_SIZE {
fragments.push(Fragment {
size: 0,
location: FragmentLocation::NIL,
});
continue;
}
while fragment_size != 0 {
debug_assert_ne!(fragment_size, NIL_STREAM_SIZE);
let location_lo = dec.u32()?;
let location_hi = dec.u32()?;
if location_lo == u32::MAX && location_hi == u32::MAX {
bail!("The Stream Directory contains an invalid fragment record.");
}
let location = FragmentLocation {
lo: location_lo,
hi: location_hi,
};
if location.is_compressed() {
let first_chunk = location.compressed_first_chunk();
let offset_within_chunk = location.compressed_offset_within_chunk();
let Some(chunk) = chunk_table.get(first_chunk as usize) else {
bail!("The Stream Directory contains an invalid fragment record. Chunk index {first_chunk} exceeds the size of the chunk table.");
};
let uncompressed_chunk_size = chunk.uncompressed_size.get();
if offset_within_chunk >= uncompressed_chunk_size {
bail!("The Stream Directory contains an invalid fragment record. offset_within_chunk {offset_within_chunk} exceeds the size of the chunk.");
};
} else {
}
fragments.push(Fragment {
size: fragment_size,
location,
});
fragment_size = dec.u32()?;
if fragment_size == NIL_STREAM_SIZE {
bail!("Stream directory is malformed. It contains a non-initial fragment with size = NIL_STREAM_SIZE.");
}
}
}
stream_fragments.push(fragments.len() as u32);
fragments.shrink_to_fit();
Ok(DecodedStreamDir {
fragments,
stream_fragments,
})
}
struct Decoder<'a> {
bytes: &'a [u8],
}
impl<'a> Decoder<'a> {
fn next_n<const N: usize>(&mut self) -> anyhow::Result<&'a [u8; N]> {
if self.bytes.len() < N {
bail!("Buffer ran out of bytes");
}
let (lo, hi) = self.bytes.split_at(N);
self.bytes = hi;
Ok(<&[u8; N]>::try_from(lo).unwrap())
}
fn u32(&mut self) -> anyhow::Result<u32> {
Ok(u32::from_le_bytes(*self.next_n()?))
}
}
fn map_alloc_error<T>(result: Result<T, zerocopy::AllocError>) -> anyhow::Result<T> {
match result {
Ok(value) => Ok(value),
Err(zerocopy::AllocError) => {
Err(std::io::Error::from(std::io::ErrorKind::OutOfMemory).into())
}
}
}