1use crate::*;
2use anyhow::{Result, bail};
3use core::mem::size_of;
4use std::fs::File;
5use std::io::{Read, Seek, SeekFrom};
6use std::path::Path;
7use std::sync::{Arc, OnceLock};
8use sync_file::{RandomAccessFile, ReadAt};
9use tracing::{debug, debug_span, info_span, trace, trace_span};
10use zerocopy::IntoBytes;
11
12pub struct Msfz<F = RandomAccessFile> {
14 file: F,
15 fragments: Vec<Fragment>,
21
22 stream_fragments: Vec<u32>,
30
31 chunk_table: Box<[ChunkEntry]>,
32 chunk_cache: Vec<OnceLock<Arc<[u8]>>>,
33}
34
35#[derive(Clone)]
37struct Fragment {
38 size: u32,
39 location: FragmentLocation,
40}
41
42impl std::fmt::Debug for Fragment {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 write!(f, "size 0x{:05x} at {:?}", self.size, self.location)
45 }
46}
47
48impl std::fmt::Debug for FragmentLocation {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 if self.is_nil() {
51 f.write_str("nil")
52 } else if self.is_compressed() {
53 write!(
54 f,
55 "uncompressed at 0x{:06x}",
56 self.uncompressed_file_offset()
57 )
58 } else {
59 write!(
60 f,
61 "chunk {} : 0x{:04x}",
62 self.compressed_first_chunk(),
63 self.compressed_offset_within_chunk()
64 )
65 }
66 }
67}
68
69const FRAGMENT_LOCATION_32BIT_IS_COMPRESSED_MASK: u32 = 1u32 << 31;
70
71#[derive(Copy, Clone)]
73struct FragmentLocation {
74 lo: u32,
76 hi: u32,
78}
79
80impl FragmentLocation {
81 const NIL: Self = Self {
84 lo: u32::MAX,
85 hi: u32::MAX,
86 };
87
88 fn is_nil(&self) -> bool {
89 self.lo == u32::MAX && self.hi == u32::MAX
90 }
91
92 fn is_compressed(&self) -> bool {
93 (self.hi & FRAGMENT_LOCATION_32BIT_IS_COMPRESSED_MASK) != 0
94 }
95
96 fn compressed_first_chunk(&self) -> u32 {
97 debug_assert!(!self.is_nil());
98 debug_assert!(self.is_compressed());
99 self.hi & !FRAGMENT_LOCATION_32BIT_IS_COMPRESSED_MASK
100 }
101
102 fn compressed_offset_within_chunk(&self) -> u32 {
103 debug_assert!(!self.is_nil());
104 debug_assert!(self.is_compressed());
105 self.lo
106 }
107
108 fn uncompressed_file_offset(&self) -> u64 {
109 debug_assert!(!self.is_nil());
110 debug_assert!(!self.is_compressed());
111 ((self.hi as u64) << 32) | (self.lo as u64)
112 }
113}
114
115impl Msfz<RandomAccessFile> {
116 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
118 let f = open_options_shared(File::options().read(true)).open(path)?;
119 let raf = RandomAccessFile::from(f);
120 Self::from_file(raf)
121 }
122}
123
124impl<F: ReadAt> Msfz<F> {
125 pub fn from_file(file: F) -> Result<Self> {
127 let _span = info_span!("Msfz::from_file").entered();
128
129 let mut header: MsfzFileHeader = MsfzFileHeader::new_zeroed();
130 file.read_exact_at(header.as_mut_bytes(), 0)?;
131
132 if header.signature != MSFZ_FILE_SIGNATURE {
133 bail!("This file does not have a PDZ file signature.");
134 }
135
136 if header.version.get() != MSFZ_FILE_VERSION_V0 {
137 bail!("This PDZ file uses a version number that is not supported.");
138 }
139
140 let num_streams = header.num_streams.get();
142 if num_streams == 0 {
143 bail!("The stream directory is invalid; it is empty.");
144 }
145
146 let stream_dir_size_uncompressed = header.stream_dir_size_uncompressed.get() as usize;
147 let stream_dir_size_compressed = header.stream_dir_size_compressed.get() as usize;
148 let stream_dir_file_offset = header.stream_dir_offset.get();
149 let stream_dir_compression = header.stream_dir_compression.get();
150 debug!(
151 num_streams,
152 stream_dir_size_uncompressed,
153 stream_dir_size_compressed,
154 stream_dir_compression,
155 stream_dir_file_offset,
156 "reading stream directory"
157 );
158
159 let mut stream_dir_bytes: Vec<u8> =
160 map_alloc_error(FromZeros::new_vec_zeroed(stream_dir_size_uncompressed))?;
161 if let Some(compression) = Compression::try_from_code_opt(stream_dir_compression)? {
162 let mut compressed_stream_dir: Vec<u8> =
163 map_alloc_error(FromZeros::new_vec_zeroed(stream_dir_size_compressed))?;
164 file.read_exact_at(
165 compressed_stream_dir.as_mut_bytes(),
166 header.stream_dir_offset.get(),
167 )?;
168
169 debug!("decompressing stream directory");
170
171 crate::compress_utils::decompress_to_slice(
172 compression,
173 &compressed_stream_dir,
174 &mut stream_dir_bytes,
175 )?;
176 } else {
177 if stream_dir_size_uncompressed != stream_dir_size_compressed {
178 bail!(
179 "This PDZ file is invalid. The Stream Directory is not compressed, but has inconsistent compressed vs. uncompressed sizes."
180 );
181 }
182 file.read_exact_at(stream_dir_bytes.as_mut_bytes(), stream_dir_file_offset)?;
183 }
184
185 let num_chunks = header.num_chunks.get() as usize;
187 let chunk_index_size = header.chunk_table_size.get() as usize;
188 if chunk_index_size != num_chunks * size_of::<ChunkEntry>() {
189 bail!("This PDZ file is invalid. num_chunks and chunk_index_size are not consistent.");
190 }
191
192 let chunk_table_offset = header.chunk_table_offset.get();
193 let mut chunk_table: Box<[ChunkEntry]> =
194 map_alloc_error(FromZeros::new_box_zeroed_with_elems(num_chunks))?;
195 if num_chunks != 0 {
196 debug!(
197 num_chunks,
198 chunk_table_offset, "reading compressed chunk table"
199 );
200 file.read_exact_at(chunk_table.as_mut_bytes(), chunk_table_offset)?;
201 } else {
202 }
204
205 let mut chunk_cache = Vec::with_capacity(num_chunks);
206 chunk_cache.resize_with(num_chunks, Default::default);
207
208 let stream_dir = decode_stream_dir(&stream_dir_bytes, num_streams, &chunk_table)?;
211
212 Ok(Self {
213 file,
214 fragments: stream_dir.fragments,
215 stream_fragments: stream_dir.stream_fragments,
216 chunk_table,
217 chunk_cache,
218 })
219 }
220
221 pub fn num_streams(&self) -> u32 {
223 (self.stream_fragments.len() - 1) as u32
224 }
225
226 fn stream_fragments_result(&self, stream: u32) -> Result<&[Fragment]> {
227 self.stream_fragments(stream)
228 .ok_or_else(|| anyhow::anyhow!("Stream index is out of range"))
229 }
230
231 fn stream_fragments(&self, stream: u32) -> Option<&[Fragment]> {
235 let i = stream as usize;
236 if i < self.stream_fragments.len() - 1 {
237 let start = self.stream_fragments[i] as usize;
238 let end = self.stream_fragments[i + 1] as usize;
239 let fragments = &self.fragments[start..end];
240 match fragments {
241 [f, ..] if f.location.is_nil() => Some(&[]),
242 _ => Some(fragments),
243 }
244 } else {
245 None
246 }
247 }
248
249 pub fn stream_size(&self, stream: u32) -> Result<u64> {
255 let fragments = self.stream_fragments_result(stream)?;
256 Ok(fragments.iter().map(|f| f.size as u64).sum())
257 }
258
259 #[allow(clippy::match_like_matches_macro)]
266 pub fn is_stream_valid(&self, stream: u32) -> bool {
267 assert!(!self.stream_fragments.is_empty());
268
269 if stream == 0 {
270 return false;
271 }
272
273 let i = stream as usize;
274 if i < self.stream_fragments.len() - 1 {
275 let start = self.stream_fragments[i] as usize;
276 let end = self.stream_fragments[i + 1] as usize;
277 let fragments = &self.fragments[start..end];
278 match fragments {
279 [f, ..] if f.location.is_nil() => false,
280 _ => true,
281 }
282 } else {
283 false
284 }
285 }
286
287 fn get_chunk_slice(&self, chunk: u32, offset: u32, size: u32) -> std::io::Result<&[u8]> {
290 let chunk_data = self.get_chunk_data(chunk)?;
291 if let Some(slice) = chunk_data.get(offset as usize..offset as usize + size as usize) {
292 Ok(slice)
293 } else {
294 Err(std::io::Error::new(
295 std::io::ErrorKind::InvalidData,
296 "PDZ file contains invalid byte ranges within a chunk",
297 ))
298 }
299 }
300
301 fn get_chunk_data(&self, chunk_index: u32) -> std::io::Result<&Arc<[u8]>> {
302 let _span = trace_span!("get_chunk_data").entered();
303 trace!(chunk_index);
304
305 debug_assert_eq!(self.chunk_cache.len(), self.chunk_table.len());
306
307 let Some(slot) = self.chunk_cache.get(chunk_index as usize) else {
308 return Err(std::io::Error::new(
309 std::io::ErrorKind::InvalidInput,
310 "Chunk index is out of range.",
311 ));
312 };
313
314 if let Some(arc) = slot.get() {
315 trace!(chunk_index, "found chunk in cache");
316 return Ok(arc);
317 }
318
319 let arc = self.load_chunk_data(chunk_index)?;
320 Ok(slot.get_or_init(move || arc))
321 }
322
323 #[inline(never)]
326 fn load_chunk_data(&self, chunk_index: u32) -> std::io::Result<Arc<[u8]>> {
327 assert_eq!(self.chunk_cache.len(), self.chunk_table.len());
328
329 let _span = debug_span!("load_chunk_data").entered();
330
331 let entry = &self.chunk_table[chunk_index as usize];
337
338 let compression_opt =
339 Compression::try_from_code_opt(entry.compression.get()).map_err(|_| {
340 std::io::Error::new(
341 std::io::ErrorKind::Unsupported,
342 "Chunk uses an unrecognized compression algorithm",
343 )
344 })?;
345
346 let mut compressed_data: Box<[u8]> =
348 FromZeros::new_box_zeroed_with_elems(entry.compressed_size.get() as usize)
349 .map_err(|_| std::io::Error::from(std::io::ErrorKind::OutOfMemory))?;
350 self.file
351 .read_exact_at(&mut compressed_data, entry.file_offset.get())?;
352
353 let uncompressed_data: Box<[u8]> = if let Some(compression) = compression_opt {
354 let mut uncompressed_data: Box<[u8]> =
355 FromZeros::new_box_zeroed_with_elems(entry.uncompressed_size.get() as usize)
356 .map_err(|_| std::io::Error::from(std::io::ErrorKind::OutOfMemory))?;
357
358 self::compress_utils::decompress_to_slice(
359 compression,
360 &compressed_data,
361 &mut uncompressed_data,
362 )?;
363 uncompressed_data
364 } else {
365 compressed_data
367 };
368
369 Ok(Arc::from(uncompressed_data))
373 }
374
375 pub fn read_stream(&self, stream: u32) -> anyhow::Result<StreamData> {
380 let _span = trace_span!("read_stream_to_cow").entered();
381 trace!(stream);
382
383 let mut fragments = self.stream_fragments_result(stream)?;
384
385 match fragments.first() {
386 Some(f) if f.location.is_nil() => fragments = &[],
387 _ => {}
388 }
389
390 if fragments.is_empty() {
392 return Ok(StreamData::empty());
393 }
394
395 if fragments.len() == 1 && fragments[0].location.is_compressed() {
399 let chunk_index = fragments[0].location.compressed_first_chunk();
400 let offset_within_chunk = fragments[0].location.compressed_offset_within_chunk();
401
402 let chunk_data = self.get_chunk_data(chunk_index)?;
403 let fragment_range = offset_within_chunk as usize
404 ..offset_within_chunk as usize + fragments[0].size as usize;
405
406 if chunk_data.get(fragment_range.clone()).is_none() {
408 bail!("PDZ data is invalid. Stream fragment byte range is out of range.");
409 }
410
411 return Ok(StreamData::ArcSlice(Arc::clone(chunk_data), fragment_range));
412 }
413
414 let stream_size: u32 = fragments.iter().map(|f| f.size).sum();
415 let stream_usize = stream_size as usize;
416
417 let mut output_buffer: Box<[u8]> = FromZeros::new_box_zeroed_with_elems(stream_usize)
419 .map_err(|_| std::io::Error::from(std::io::ErrorKind::OutOfMemory))?;
420 let mut output_slice: &mut [u8] = &mut output_buffer;
421
422 for fragment in fragments.iter() {
423 let stream_offset = stream_usize - output_slice.len();
424
425 let (fragment_output_slice, rest) = output_slice.split_at_mut(fragment.size as usize);
428 output_slice = rest;
429
430 if fragment.location.is_compressed() {
431 let chunk_index = fragment.location.compressed_first_chunk();
432 let offset_within_chunk = fragment.location.compressed_offset_within_chunk();
433
434 let chunk_data = self.get_chunk_data(chunk_index)?;
435 if let Some(chunk_slice) = chunk_data.get(
436 offset_within_chunk as usize
437 ..offset_within_chunk as usize + fragment.size as usize,
438 ) {
439 fragment_output_slice.copy_from_slice(chunk_slice);
440 } else {
441 bail!("PDZ data is invalid. Stream fragment byte range is out of range.");
442 }
443 } else {
444 let file_offset = fragment.location.uncompressed_file_offset();
445 trace!(
447 file_offset,
448 stream_offset,
449 fragment_len = fragment_output_slice.len(),
450 "reading uncompressed fragment"
451 );
452 self.file
453 .read_exact_at(fragment_output_slice, file_offset)?;
454 }
455 }
456
457 assert!(output_slice.is_empty());
458
459 Ok(StreamData::Box(output_buffer))
460 }
461
462 pub fn get_stream_reader(&self, stream: u32) -> Result<StreamReader<'_, F>> {
470 let fragments = self.stream_fragments_result(stream)?;
471 Ok(StreamReader {
472 msfz: self,
473 size: fragments.iter().map(|f| f.size as u64).sum(),
474 fragments,
475 pos: 0,
476 })
477 }
478
479 pub fn num_fragments(&self) -> usize {
481 self.fragments.len()
482 }
483
484 pub fn num_chunks(&self) -> usize {
486 self.chunk_table.len()
487 }
488}
489
490pub struct StreamReader<'a, F> {
492 msfz: &'a Msfz<F>,
493 size: u64,
494 fragments: &'a [Fragment],
495 pos: u64,
496}
497
498impl<'a, F> StreamReader<'a, F> {
499 pub fn is_empty(&self) -> bool {
501 self.stream_size() == 0
502 }
503
504 pub fn stream_size(&self) -> u64 {
508 self.size
509 }
510}
511
512impl<'a, F: ReadAt> ReadAt for StreamReader<'a, F> {
513 fn read_at(&self, mut buf: &mut [u8], offset: u64) -> std::io::Result<usize> {
514 if buf.is_empty() {
515 return Ok(0);
516 }
517
518 let original_buf_len = buf.len();
519 let mut current_offset: u64 = offset;
520
521 for fragment in self.fragments.iter() {
522 debug_assert!(!buf.is_empty());
523
524 if current_offset >= fragment.size as u64 {
525 current_offset -= fragment.size as u64;
526 continue;
527 }
528
529 let fragment_bytes_available = fragment.size - current_offset as u32;
531
532 let num_bytes_xfer = buf.len().min(fragment_bytes_available as usize);
533 let (buf_xfer, buf_rest) = buf.split_at_mut(num_bytes_xfer);
534 buf = buf_rest;
535
536 if fragment.location.is_compressed() {
537 let chunk_index = fragment.location.compressed_first_chunk();
538 let offset_within_chunk = fragment.location.compressed_offset_within_chunk();
539
540 let chunk_slice = self.msfz.get_chunk_slice(
541 chunk_index,
542 offset_within_chunk + current_offset as u32,
543 num_bytes_xfer as u32,
544 )?;
545 buf_xfer.copy_from_slice(chunk_slice);
546 } else {
547 let file_offset = fragment.location.uncompressed_file_offset();
549 self.msfz
550 .file
551 .read_exact_at(buf_xfer, file_offset + current_offset)?;
552 }
553
554 if buf.is_empty() {
555 break;
556 }
557
558 if current_offset >= num_bytes_xfer as u64 {
559 current_offset -= num_bytes_xfer as u64;
560 } else {
561 current_offset = 0;
562 }
563 }
564
565 Ok(original_buf_len - buf.len())
566 }
567}
568
569impl<'a, F: ReadAt> Read for StreamReader<'a, F> {
570 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
571 let n = self.read_at(buf, self.pos)?;
572 self.pos += n as u64;
573 Ok(n)
574 }
575}
576
577impl<'a, F> Seek for StreamReader<'a, F> {
578 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
579 match pos {
580 SeekFrom::Start(p) => self.pos = p,
581 SeekFrom::End(offset) => {
582 let new_pos = self.stream_size() as i64 + offset;
583 if new_pos < 0 {
584 return Err(std::io::ErrorKind::InvalidInput.into());
585 }
586 self.pos = new_pos as u64;
587 }
588 SeekFrom::Current(offset) => {
589 let new_pos = self.pos as i64 + offset;
590 if new_pos < 0 {
591 return Err(std::io::ErrorKind::InvalidInput.into());
592 }
593 self.pos = new_pos as u64;
594 }
595 }
596 Ok(self.pos)
597 }
598}
599
600struct DecodedStreamDir {
601 fragments: Vec<Fragment>,
602 stream_fragments: Vec<u32>,
603}
604
605fn decode_stream_dir(
606 stream_dir_bytes: &[u8],
607 num_streams: u32,
608 chunk_table: &[ChunkEntry],
609) -> anyhow::Result<DecodedStreamDir> {
610 let mut dec = Decoder {
611 bytes: stream_dir_bytes,
612 };
613
614 let mut fragments: Vec<Fragment> = Vec::new();
615 let mut stream_fragments: Vec<u32> = Vec::with_capacity(num_streams as usize + 1);
616
617 for _ in 0..num_streams {
618 stream_fragments.push(fragments.len() as u32);
619
620 let mut fragment_size = dec.u32()?;
621
622 if fragment_size == NIL_STREAM_SIZE {
623 fragments.push(Fragment {
627 size: 0,
628 location: FragmentLocation::NIL,
629 });
630 continue;
631 }
632
633 while fragment_size != 0 {
634 debug_assert_ne!(fragment_size, NIL_STREAM_SIZE);
635
636 let location_lo = dec.u32()?;
637 let location_hi = dec.u32()?;
638
639 if location_lo == u32::MAX && location_hi == u32::MAX {
640 bail!("The Stream Directory contains an invalid fragment record.");
641 }
642
643 let location = FragmentLocation {
644 lo: location_lo,
645 hi: location_hi,
646 };
647
648 if location.is_compressed() {
649 let first_chunk = location.compressed_first_chunk();
650 let offset_within_chunk = location.compressed_offset_within_chunk();
651
652 let Some(chunk) = chunk_table.get(first_chunk as usize) else {
653 bail!(
654 "The Stream Directory contains an invalid fragment record. Chunk index {first_chunk} exceeds the size of the chunk table."
655 );
656 };
657
658 let uncompressed_chunk_size = chunk.uncompressed_size.get();
659
660 if offset_within_chunk >= uncompressed_chunk_size {
664 bail!(
665 "The Stream Directory contains an invalid fragment record. offset_within_chunk {offset_within_chunk} exceeds the size of the chunk."
666 );
667 };
668
669 } else {
672 }
678
679 fragments.push(Fragment {
680 size: fragment_size,
681 location,
682 });
683
684 fragment_size = dec.u32()?;
687 if fragment_size == NIL_STREAM_SIZE {
688 bail!(
689 "Stream directory is malformed. It contains a non-initial fragment with size = NIL_STREAM_SIZE."
690 );
691 }
692 }
694 }
695
696 stream_fragments.push(fragments.len() as u32);
697
698 fragments.shrink_to_fit();
699
700 Ok(DecodedStreamDir {
701 fragments,
702 stream_fragments,
703 })
704}
705
706struct Decoder<'a> {
707 bytes: &'a [u8],
708}
709
710impl<'a> Decoder<'a> {
711 fn next_n<const N: usize>(&mut self) -> anyhow::Result<&'a [u8; N]> {
712 if self.bytes.len() < N {
713 bail!("Buffer ran out of bytes");
714 }
715
716 let (lo, hi) = self.bytes.split_at(N);
717 self.bytes = hi;
718 Ok(<&[u8; N]>::try_from(lo).unwrap())
721 }
722
723 fn u32(&mut self) -> anyhow::Result<u32> {
724 Ok(u32::from_le_bytes(*self.next_n()?))
725 }
726}
727
728fn map_alloc_error<T>(result: Result<T, zerocopy::AllocError>) -> anyhow::Result<T> {
729 match result {
730 Ok(value) => Ok(value),
731 Err(zerocopy::AllocError) => {
732 Err(std::io::Error::from(std::io::ErrorKind::OutOfMemory).into())
733 }
734 }
735}