1use crate::*;
2use anyhow::{Result, bail};
3use core::mem::size_of;
4use std::fs::File;
5use std::io::{Read, Seek, SeekFrom};
6use std::path::Path;
7use std::sync::{Arc, OnceLock};
8use sync_file::{RandomAccessFile, ReadAt};
9use tracing::{debug, debug_span, info_span, trace, trace_span};
10use zerocopy::IntoBytes;
11
12pub struct Msfz<F = RandomAccessFile> {
14 file: F,
15 fragments: Vec<Fragment>,
21
22 stream_fragments: Vec<u32>,
30
31 chunk_table: Box<[ChunkEntry]>,
32 chunk_cache: Vec<OnceLock<Arc<[u8]>>>,
33}
34
35#[derive(Clone)]
37pub struct Fragment {
38 pub size: u32,
40 pub location: FragmentLocation,
42}
43
44impl std::fmt::Debug for Fragment {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(f, "size 0x{:05x} at {:?}", self.size, self.location)
47 }
48}
49
50impl std::fmt::Debug for FragmentLocation {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 if self.is_nil() {
53 f.write_str("nil")
54 } else if self.is_compressed() {
55 write!(
56 f,
57 "chunk {} : 0x{:04x}",
58 self.compressed_first_chunk(),
59 self.compressed_offset_within_chunk()
60 )
61 } else {
62 write!(
63 f,
64 "uncompressed at 0x{:06x}",
65 self.uncompressed_file_offset()
66 )
67 }
68 }
69}
70
71const FRAGMENT_LOCATION_32BIT_IS_COMPRESSED_MASK: u32 = 1u32 << 31;
72
73const UNCOMPRESSED_FRAGMENT_FILE_OFFSET_MASK: u64 = (1u64 << 48) - 1;
76
77#[derive(Copy, Clone)]
79pub struct FragmentLocation {
80 lo: u32,
82 hi: u32,
84}
85
86impl FragmentLocation {
87 const NIL: Self = Self {
90 lo: u32::MAX,
91 hi: u32::MAX,
92 };
93
94 fn is_nil(&self) -> bool {
95 self.lo == u32::MAX && self.hi == u32::MAX
96 }
97
98 pub fn is_compressed(&self) -> bool {
100 (self.hi & FRAGMENT_LOCATION_32BIT_IS_COMPRESSED_MASK) != 0
101 }
102
103 pub fn compressed_first_chunk(&self) -> u32 {
107 debug_assert!(!self.is_nil());
108 debug_assert!(self.is_compressed());
109 self.hi & !FRAGMENT_LOCATION_32BIT_IS_COMPRESSED_MASK
110 }
111
112 fn compressed_offset_within_chunk(&self) -> u32 {
113 debug_assert!(!self.is_nil());
114 debug_assert!(self.is_compressed());
115 self.lo
116 }
117
118 fn uncompressed_file_offset(&self) -> u64 {
119 debug_assert!(!self.is_nil());
120 debug_assert!(!self.is_compressed());
121 (((self.hi as u64) << 32) | (self.lo as u64)) & UNCOMPRESSED_FRAGMENT_FILE_OFFSET_MASK
122 }
123}
124
125impl Msfz<RandomAccessFile> {
126 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
128 let f = open_options_shared(File::options().read(true)).open(path)?;
129 let raf = RandomAccessFile::from(f);
130 Self::from_file(raf)
131 }
132}
133
134impl<F: ReadAt> Msfz<F> {
135 pub fn from_file(file: F) -> Result<Self> {
137 let _span = info_span!("Msfz::from_file").entered();
138
139 let mut header: MsfzFileHeader = MsfzFileHeader::new_zeroed();
140 file.read_exact_at(header.as_mut_bytes(), 0)?;
141
142 if header.signature != MSFZ_FILE_SIGNATURE {
143 bail!("This file does not have a PDZ file signature.");
144 }
145
146 if header.version.get() != MSFZ_FILE_VERSION_V0 {
147 bail!("This PDZ file uses a version number that is not supported.");
148 }
149
150 let num_streams = header.num_streams.get();
152 if num_streams == 0 {
153 bail!("The stream directory is invalid; it is empty.");
154 }
155
156 let stream_dir_size_uncompressed = header.stream_dir_size_uncompressed.get() as usize;
157 let stream_dir_size_compressed = header.stream_dir_size_compressed.get() as usize;
158 let stream_dir_file_offset = header.stream_dir_offset.get();
159 let stream_dir_compression = header.stream_dir_compression.get();
160 debug!(
161 num_streams,
162 stream_dir_size_uncompressed,
163 stream_dir_size_compressed,
164 stream_dir_compression,
165 stream_dir_file_offset,
166 "reading stream directory"
167 );
168
169 let mut stream_dir_bytes: Vec<u8> =
170 map_alloc_error(FromZeros::new_vec_zeroed(stream_dir_size_uncompressed))?;
171 if let Some(compression) = Compression::try_from_code_opt(stream_dir_compression)? {
172 let mut compressed_stream_dir: Vec<u8> =
173 map_alloc_error(FromZeros::new_vec_zeroed(stream_dir_size_compressed))?;
174 file.read_exact_at(
175 compressed_stream_dir.as_mut_bytes(),
176 header.stream_dir_offset.get(),
177 )?;
178
179 debug!("decompressing stream directory");
180
181 crate::compress_utils::decompress_to_slice(
182 compression,
183 &compressed_stream_dir,
184 &mut stream_dir_bytes,
185 )?;
186 } else {
187 if stream_dir_size_uncompressed != stream_dir_size_compressed {
188 bail!(
189 "This PDZ file is invalid. The Stream Directory is not compressed, but has inconsistent compressed vs. uncompressed sizes."
190 );
191 }
192 file.read_exact_at(stream_dir_bytes.as_mut_bytes(), stream_dir_file_offset)?;
193 }
194
195 let num_chunks = header.num_chunks.get() as usize;
197 let chunk_index_size = header.chunk_table_size.get() as usize;
198 if chunk_index_size != num_chunks * size_of::<ChunkEntry>() {
199 bail!("This PDZ file is invalid. num_chunks and chunk_index_size are not consistent.");
200 }
201
202 let chunk_table_offset = header.chunk_table_offset.get();
203 let mut chunk_table: Box<[ChunkEntry]> =
204 map_alloc_error(FromZeros::new_box_zeroed_with_elems(num_chunks))?;
205 if num_chunks != 0 {
206 debug!(
207 num_chunks,
208 chunk_table_offset, "reading compressed chunk table"
209 );
210 file.read_exact_at(chunk_table.as_mut_bytes(), chunk_table_offset)?;
211 } else {
212 }
214
215 let mut chunk_cache = Vec::with_capacity(num_chunks);
216 chunk_cache.resize_with(num_chunks, Default::default);
217
218 let stream_dir = decode_stream_dir(&stream_dir_bytes, num_streams, &chunk_table)?;
221
222 Ok(Self {
223 file,
224 fragments: stream_dir.fragments,
225 stream_fragments: stream_dir.stream_fragments,
226 chunk_table,
227 chunk_cache,
228 })
229 }
230
231 pub fn num_streams(&self) -> u32 {
233 (self.stream_fragments.len() - 1) as u32
234 }
235
236 fn stream_fragments_result(&self, stream: u32) -> Result<&[Fragment]> {
237 self.stream_fragments(stream)
238 .ok_or_else(|| anyhow::anyhow!("Stream index is out of range"))
239 }
240
241 pub fn stream_fragments(&self, stream: u32) -> Option<&[Fragment]> {
245 let i = stream as usize;
246 if i < self.stream_fragments.len() - 1 {
247 let start = self.stream_fragments[i] as usize;
248 let end = self.stream_fragments[i + 1] as usize;
249 let fragments = &self.fragments[start..end];
250 match fragments {
251 [f, ..] if f.location.is_nil() => Some(&[]),
252 _ => Some(fragments),
253 }
254 } else {
255 None
256 }
257 }
258
259 pub fn stream_size(&self, stream: u32) -> Result<u64> {
265 let fragments = self.stream_fragments_result(stream)?;
266 Ok(fragments.iter().map(|f| f.size as u64).sum())
267 }
268
269 #[allow(clippy::match_like_matches_macro)]
276 pub fn is_stream_valid(&self, stream: u32) -> bool {
277 assert!(!self.stream_fragments.is_empty());
278
279 if stream == 0 {
280 return false;
281 }
282
283 let i = stream as usize;
284 if i < self.stream_fragments.len() - 1 {
285 let start = self.stream_fragments[i] as usize;
286 let end = self.stream_fragments[i + 1] as usize;
287 let fragments = &self.fragments[start..end];
288 match fragments {
289 [f, ..] if f.location.is_nil() => false,
290 _ => true,
291 }
292 } else {
293 false
294 }
295 }
296
297 fn get_chunk_slice(&self, chunk: u32, offset: u32, size: u32) -> std::io::Result<&[u8]> {
300 let chunk_data = self.get_chunk_data(chunk)?;
301 if let Some(slice) = chunk_data.get(offset as usize..offset as usize + size as usize) {
302 Ok(slice)
303 } else {
304 Err(std::io::Error::new(
305 std::io::ErrorKind::InvalidData,
306 "PDZ file contains invalid byte ranges within a chunk",
307 ))
308 }
309 }
310
311 fn get_chunk_data(&self, chunk_index: u32) -> std::io::Result<&Arc<[u8]>> {
312 let _span = trace_span!("get_chunk_data").entered();
313 trace!(chunk_index);
314
315 debug_assert_eq!(self.chunk_cache.len(), self.chunk_table.len());
316
317 let Some(slot) = self.chunk_cache.get(chunk_index as usize) else {
318 return Err(std::io::Error::new(
319 std::io::ErrorKind::InvalidInput,
320 "Chunk index is out of range.",
321 ));
322 };
323
324 if let Some(arc) = slot.get() {
325 trace!(chunk_index, "found chunk in cache");
326 return Ok(arc);
327 }
328
329 let arc = self.load_chunk_data(chunk_index)?;
330 Ok(slot.get_or_init(move || arc))
331 }
332
333 #[inline(never)]
336 fn load_chunk_data(&self, chunk_index: u32) -> std::io::Result<Arc<[u8]>> {
337 assert_eq!(self.chunk_cache.len(), self.chunk_table.len());
338
339 let _span = debug_span!("load_chunk_data").entered();
340
341 let entry = &self.chunk_table[chunk_index as usize];
347
348 let compression_opt =
349 Compression::try_from_code_opt(entry.compression.get()).map_err(|_| {
350 std::io::Error::new(
351 std::io::ErrorKind::Unsupported,
352 "Chunk uses an unrecognized compression algorithm",
353 )
354 })?;
355
356 let mut compressed_data: Box<[u8]> =
358 FromZeros::new_box_zeroed_with_elems(entry.compressed_size.get() as usize)
359 .map_err(|_| std::io::Error::from(std::io::ErrorKind::OutOfMemory))?;
360 self.file
361 .read_exact_at(&mut compressed_data, entry.file_offset.get())?;
362
363 let uncompressed_data: Box<[u8]> = if let Some(compression) = compression_opt {
364 let mut uncompressed_data: Box<[u8]> =
365 FromZeros::new_box_zeroed_with_elems(entry.uncompressed_size.get() as usize)
366 .map_err(|_| std::io::Error::from(std::io::ErrorKind::OutOfMemory))?;
367
368 self::compress_utils::decompress_to_slice(
369 compression,
370 &compressed_data,
371 &mut uncompressed_data,
372 )?;
373 uncompressed_data
374 } else {
375 compressed_data
377 };
378
379 Ok(Arc::from(uncompressed_data))
383 }
384
385 pub fn read_stream(&self, stream: u32) -> anyhow::Result<StreamData> {
390 let _span = trace_span!("read_stream_to_cow").entered();
391 trace!(stream);
392
393 let mut fragments = self.stream_fragments_result(stream)?;
394
395 match fragments.first() {
396 Some(f) if f.location.is_nil() => fragments = &[],
397 _ => {}
398 }
399
400 if fragments.is_empty() {
402 return Ok(StreamData::empty());
403 }
404
405 if fragments.len() == 1 && fragments[0].location.is_compressed() {
409 let chunk_index = fragments[0].location.compressed_first_chunk();
410 let offset_within_chunk = fragments[0].location.compressed_offset_within_chunk();
411
412 let chunk_data = self.get_chunk_data(chunk_index)?;
413 let fragment_range = offset_within_chunk as usize
414 ..offset_within_chunk as usize + fragments[0].size as usize;
415
416 if chunk_data.get(fragment_range.clone()).is_none() {
418 bail!("PDZ data is invalid. Stream fragment byte range is out of range.");
419 }
420
421 return Ok(StreamData::ArcSlice(Arc::clone(chunk_data), fragment_range));
422 }
423
424 let stream_size: u32 = fragments.iter().map(|f| f.size).sum();
425 let stream_usize = stream_size as usize;
426
427 let mut output_buffer: Box<[u8]> = FromZeros::new_box_zeroed_with_elems(stream_usize)
429 .map_err(|_| std::io::Error::from(std::io::ErrorKind::OutOfMemory))?;
430 let mut output_slice: &mut [u8] = &mut output_buffer;
431
432 for fragment in fragments.iter() {
433 let stream_offset = stream_usize - output_slice.len();
434
435 let (fragment_output_slice, rest) = output_slice.split_at_mut(fragment.size as usize);
438 output_slice = rest;
439
440 if fragment.location.is_compressed() {
441 let chunk_index = fragment.location.compressed_first_chunk();
442 let offset_within_chunk = fragment.location.compressed_offset_within_chunk();
443
444 let chunk_data = self.get_chunk_data(chunk_index)?;
445 if let Some(chunk_slice) = chunk_data.get(
446 offset_within_chunk as usize
447 ..offset_within_chunk as usize + fragment.size as usize,
448 ) {
449 fragment_output_slice.copy_from_slice(chunk_slice);
450 } else {
451 bail!("PDZ data is invalid. Stream fragment byte range is out of range.");
452 }
453 } else {
454 let file_offset = fragment.location.uncompressed_file_offset();
455 trace!(
457 file_offset,
458 stream_offset,
459 fragment_len = fragment_output_slice.len(),
460 "reading uncompressed fragment"
461 );
462 self.file
463 .read_exact_at(fragment_output_slice, file_offset)?;
464 }
465 }
466
467 assert!(output_slice.is_empty());
468
469 Ok(StreamData::Box(output_buffer))
470 }
471
472 pub fn get_stream_reader(&self, stream: u32) -> Result<StreamReader<'_, F>> {
480 let fragments = self.stream_fragments_result(stream)?;
481 Ok(StreamReader {
482 msfz: self,
483 size: fragments.iter().map(|f| f.size as u64).sum(),
484 fragments,
485 pos: 0,
486 })
487 }
488
489 pub fn num_fragments(&self) -> usize {
491 self.fragments.len()
492 }
493
494 pub fn fragments(&self) -> &[Fragment] {
496 &self.fragments
497 }
498
499 pub fn num_chunks(&self) -> usize {
501 self.chunk_table.len()
502 }
503
504 pub fn chunks(&self) -> &[ChunkEntry] {
506 &self.chunk_table
507 }
508}
509
510pub struct StreamReader<'a, F> {
512 msfz: &'a Msfz<F>,
513 size: u64,
514 fragments: &'a [Fragment],
515 pos: u64,
516}
517
518impl<'a, F> StreamReader<'a, F> {
519 pub fn is_empty(&self) -> bool {
521 self.stream_size() == 0
522 }
523
524 pub fn stream_size(&self) -> u64 {
528 self.size
529 }
530}
531
532impl<'a, F: ReadAt> ReadAt for StreamReader<'a, F> {
533 fn read_at(&self, mut buf: &mut [u8], offset: u64) -> std::io::Result<usize> {
534 if buf.is_empty() {
535 return Ok(0);
536 }
537
538 let original_buf_len = buf.len();
539 let mut current_offset: u64 = offset;
540
541 for fragment in self.fragments.iter() {
542 debug_assert!(!buf.is_empty());
543
544 if current_offset >= fragment.size as u64 {
545 current_offset -= fragment.size as u64;
546 continue;
547 }
548
549 let fragment_bytes_available = fragment.size - current_offset as u32;
551
552 let num_bytes_xfer = buf.len().min(fragment_bytes_available as usize);
553 let (buf_xfer, buf_rest) = buf.split_at_mut(num_bytes_xfer);
554 buf = buf_rest;
555
556 if fragment.location.is_compressed() {
557 let chunk_index = fragment.location.compressed_first_chunk();
558 let offset_within_chunk = fragment.location.compressed_offset_within_chunk();
559
560 let chunk_slice = self.msfz.get_chunk_slice(
561 chunk_index,
562 offset_within_chunk + current_offset as u32,
563 num_bytes_xfer as u32,
564 )?;
565 buf_xfer.copy_from_slice(chunk_slice);
566 } else {
567 let file_offset = fragment.location.uncompressed_file_offset();
571 self.msfz
572 .file
573 .read_exact_at(buf_xfer, file_offset + current_offset)?;
574 }
575
576 if buf.is_empty() {
577 break;
578 }
579
580 current_offset = 0;
583 }
584
585 Ok(original_buf_len - buf.len())
586 }
587}
588
589impl<'a, F: ReadAt> Read for StreamReader<'a, F> {
590 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
591 let n = self.read_at(buf, self.pos)?;
592 self.pos += n as u64;
593 Ok(n)
594 }
595}
596
597impl<'a, F> Seek for StreamReader<'a, F> {
598 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
599 match pos {
600 SeekFrom::Start(p) => self.pos = p,
601 SeekFrom::End(offset) => {
602 let new_pos = self.stream_size() as i64 + offset;
603 if new_pos < 0 {
604 return Err(std::io::ErrorKind::InvalidInput.into());
605 }
606 self.pos = new_pos as u64;
607 }
608 SeekFrom::Current(offset) => {
609 let new_pos = self.pos as i64 + offset;
610 if new_pos < 0 {
611 return Err(std::io::ErrorKind::InvalidInput.into());
612 }
613 self.pos = new_pos as u64;
614 }
615 }
616 Ok(self.pos)
617 }
618}
619
620struct DecodedStreamDir {
621 fragments: Vec<Fragment>,
622 stream_fragments: Vec<u32>,
623}
624
625fn decode_stream_dir(
626 stream_dir_bytes: &[u8],
627 num_streams: u32,
628 chunk_table: &[ChunkEntry],
629) -> anyhow::Result<DecodedStreamDir> {
630 let mut dec = Decoder {
631 bytes: stream_dir_bytes,
632 };
633
634 let mut fragments: Vec<Fragment> = Vec::new();
635 let mut stream_fragments: Vec<u32> = Vec::with_capacity(num_streams as usize + 1);
636
637 for _ in 0..num_streams {
638 stream_fragments.push(fragments.len() as u32);
639
640 let mut fragment_size = dec.u32()?;
641
642 if fragment_size == NIL_STREAM_SIZE {
643 fragments.push(Fragment {
647 size: 0,
648 location: FragmentLocation::NIL,
649 });
650 continue;
651 }
652
653 while fragment_size != 0 {
654 debug_assert_ne!(fragment_size, NIL_STREAM_SIZE);
655
656 let location_lo = dec.u32()?;
657 let location_hi = dec.u32()?;
658
659 if location_lo == u32::MAX && location_hi == u32::MAX {
660 bail!("The Stream Directory contains an invalid fragment record.");
661 }
662
663 let location = FragmentLocation {
664 lo: location_lo,
665 hi: location_hi,
666 };
667
668 if location.is_compressed() {
669 let first_chunk = location.compressed_first_chunk();
670 let offset_within_chunk = location.compressed_offset_within_chunk();
671
672 let Some(chunk) = chunk_table.get(first_chunk as usize) else {
673 bail!(
674 "The Stream Directory contains an invalid fragment record. Chunk index {first_chunk} exceeds the size of the chunk table."
675 );
676 };
677
678 let uncompressed_chunk_size = chunk.uncompressed_size.get();
679
680 if offset_within_chunk >= uncompressed_chunk_size {
684 bail!(
685 "The Stream Directory contains an invalid fragment record. offset_within_chunk {offset_within_chunk} exceeds the size of the chunk."
686 );
687 };
688
689 } else {
692 }
698
699 fragments.push(Fragment {
700 size: fragment_size,
701 location,
702 });
703
704 fragment_size = dec.u32()?;
707 if fragment_size == NIL_STREAM_SIZE {
708 bail!(
709 "Stream directory is malformed. It contains a non-initial fragment with size = NIL_STREAM_SIZE."
710 );
711 }
712 }
714 }
715
716 stream_fragments.push(fragments.len() as u32);
717
718 fragments.shrink_to_fit();
719
720 Ok(DecodedStreamDir {
721 fragments,
722 stream_fragments,
723 })
724}
725
726struct Decoder<'a> {
727 bytes: &'a [u8],
728}
729
730impl<'a> Decoder<'a> {
731 fn next_n<const N: usize>(&mut self) -> anyhow::Result<&'a [u8; N]> {
732 if self.bytes.len() < N {
733 bail!("Buffer ran out of bytes");
734 }
735
736 let (lo, hi) = self.bytes.split_at(N);
737 self.bytes = hi;
738 Ok(<&[u8; N]>::try_from(lo).unwrap())
741 }
742
743 fn u32(&mut self) -> anyhow::Result<u32> {
744 Ok(u32::from_le_bytes(*self.next_n()?))
745 }
746}
747
748fn map_alloc_error<T>(result: Result<T, zerocopy::AllocError>) -> anyhow::Result<T> {
749 match result {
750 Ok(value) => Ok(value),
751 Err(zerocopy::AllocError) => {
752 Err(std::io::Error::from(std::io::ErrorKind::OutOfMemory).into())
753 }
754 }
755}