hexz_core/api/file.rs
1//! High-level archive file API and logical stream types.
2
3use crate::algo::compression::{Compressor, create_compressor};
4use crate::algo::encryption::Encryptor;
5use crate::cache::buffer_pool::BufferPool;
6use crate::cache::lru::{BlockCache, ShardedPageCache};
7use crate::cache::prefetch::Prefetcher;
8use crate::format::header::Header;
9use crate::format::index::{BlockInfo, IndexPage, MasterIndex, PageEntry};
10use crate::format::version::{check_version, compatibility_message};
11use crate::store::StorageBackend;
12use bytes::Bytes;
13use crc32fast::hash as crc32_hash;
14use std::collections::HashMap;
15use std::mem::MaybeUninit;
16use std::ptr;
17use std::sync::{Arc, Mutex};
18
19use hexz_common::constants::{BLOCK_OFFSET_PARENT, DEFAULT_BLOCK_SIZE};
20use hexz_common::{Error, Result};
21use rayon::prelude::*;
22
23/// A factory function that opens a parent archive by path.
24///
25/// Provided by the caller of [`Archive::with_cache_and_loader`] so that the
26/// core read API has no hard dependency on any specific storage backend
27/// implementation. Storage crates supply a concrete loader; callers that
28/// know parents cannot exist may pass `None`.
29pub type ParentLoader = Box<dyn Fn(&str) -> Result<Arc<Archive>> + Send + Sync>;
30
31/// Shared zero block for the default block size to avoid allocating when returning zero blocks.
32static ZEROS_64K: [u8; DEFAULT_BLOCK_SIZE as usize] = [0u8; DEFAULT_BLOCK_SIZE as usize];
33
34/// A map from block hash to its location in the archive.
35type HashIndex = HashMap<[u8; 32], (ArchiveStream, u64, BlockInfo)>;
36
37/// Work item for block decompression: (`block_idx`, info, `buf_offset`, `offset_in_block`, `to_copy`)
38type WorkItem = (u64, BlockInfo, usize, usize, usize);
39
40/// Result of fetching a block from cache or storage.
41///
42/// Eliminates TOCTOU races by tracking data state at fetch time rather than
43/// re-checking the cache later (which can give a different answer if a
44/// background prefetch thread modifies the cache between check and use).
45enum FetchResult {
46 /// Data is already decompressed (came from L1 cache or is a zero block).
47 Decompressed(Bytes),
48 /// Data is raw compressed bytes from storage (needs decompression).
49 Compressed(Bytes),
50}
51
52/// Logical stream identifier for multi-stream archives.
53///
54/// Hexz archives can store independent data streams:
55/// - **Main**: Primary data stream (e.g. file system image, main dataset)
56/// - **Auxiliary**: Optional secondary data
57///
58/// # Example
59///
60/// ```ignore
61/// use hexz_core::{Archive, ArchiveStream};
62/// # use std::sync::Arc;
63/// # fn example(snapshot: Arc<Archive>) -> Result<(), Box<dyn std::error::Error>> {
64/// // Read 4KB from main stream
65/// let data = snapshot.read_at(ArchiveStream::Main, 0, 4096)?;
66///
67/// // Read 4KB from auxiliary stream (if present)
68/// let aux = snapshot.read_at(ArchiveStream::Auxiliary, 0, 4096)?;
69/// # Ok(())
70/// # }
71/// ```
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73#[repr(u8)]
74pub enum ArchiveStream {
75 /// Main data stream
76 Main = 0,
77 /// Auxiliary data stream
78 Auxiliary = 1,
79}
80
81/// Read-only interface for accessing Hexz archive data.
82///
83/// `Archive` is the primary API for reading compressed, block-indexed archives.
84/// It handles:
85/// - **Logical-to-Physical Mapping**: Translates byte offsets to blocks via index pages.
86/// - **Compression**: Transparent decompression using LZ4 or Zstandard.
87/// - **Encryption**: Transparent decryption using AES-256-GCM.
88/// - **Caching**: Two-level caching (L1 decompressed blocks, L2 index pages).
89/// - **Thin Archives**: Resolves missing blocks from parent archives.
90/// - **Prefetching**: Asynchronous background loading of sequential blocks.
91///
92/// # Thread Safety
93///
94/// `Archive` is `Send + Sync`. All methods are thread-safe and utilize sharded
95/// locks to minimize contention during concurrent reads.
96pub struct Archive {
97 /// Archive metadata (sizes, compression, encryption settings)
98 pub header: Header,
99
100 /// Decoded metadata bytes from the metadata section
101 pub metadata: Option<Vec<u8>>,
102
103 /// Master index containing top-level page entries
104 pub(crate) master: MasterIndex,
105
106 /// Storage backend for reading raw archive data
107 backend: Arc<dyn StorageBackend>,
108
109 /// Compression algorithm (LZ4 or Zstandard)
110 compressor: Box<dyn Compressor>,
111
112 /// Optional encryption (AES-256-GCM)
113 encryptor: Option<Box<dyn Encryptor>>,
114
115 /// Optional parent archive for thin (incremental) archives.
116 /// When a block's offset is `BLOCK_OFFSET_PARENT`, data is fetched from parent.
117 parents: Vec<Arc<Self>>,
118
119 /// L1 Cache: Decompressed data blocks (sharded for concurrency)
120 cache_l1: Arc<BlockCache>,
121
122 /// L2 Cache: Deserialized index pages (sharded for concurrency)
123 page_cache: Arc<ShardedPageCache>,
124
125 /// Buffer pool for reusing decompression buffers (constructed for future use)
126 _buffer_pool: Arc<BufferPool>,
127
128 /// Sequential prefetch controller
129 prefetcher: Option<Arc<Prefetcher>>,
130
131 /// Lazy hash index for resolving `ParentRef` by content rather than offset.
132 hash_index: Mutex<Option<Arc<HashIndex>>>,
133}
134
135impl std::fmt::Debug for Archive {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 f.debug_struct("Archive")
138 .field("version", &self.header.version)
139 .field("block_size", &self.header.block_size)
140 .field("compression", &self.header.compression)
141 .field("encrypted", &self.header.encryption.is_some())
142 .field("parents", &self.parents.len())
143 .finish_non_exhaustive()
144 }
145}
146
147impl Archive {
148 /// Opens a Hexz archive with default cache settings.
149 ///
150 /// This is the primary constructor for `Archive`. It:
151 /// 1. Reads and validates the archive header (magic bytes, version)
152 /// 2. Deserializes the master index
153 /// 3. Recursively loads parent archives (for thin archives)
154 /// 4. Initializes block and page caches
155 ///
156 /// # Parameters
157 ///
158 /// - `backend`: Implementation of [`StorageBackend`] (Local file, S3, etc.)
159 /// - `encryptor`: Optional decryptor (required if archive is encrypted)
160 ///
161 /// # Errors
162 ///
163 /// - `Error::Io`: Backend I/O failure or file not found.
164 /// - `Error::Format`: Invalid magic bytes or corrupted header.
165 /// - `Error::Encryption`: Missing or incorrect encryption key.
166 ///
167 /// # Example
168 ///
169 /// ```ignore
170 /// # use std::sync::Arc;
171 /// # use hexz_core::Archive;
172 /// # use hexz_store::local::FileBackend;
173 /// let backend = Arc::new(FileBackend::new("data.hxz".as_ref())?);
174 /// let archive = Archive::open(backend, None)?;
175 ///
176 /// println!("Main size: {} bytes", archive.size(ArchiveStream::Main));
177 /// # Ok::<(), Box<dyn std::error::Error>>(())
178 /// ```
179 pub fn open(
180 backend: Arc<dyn StorageBackend>,
181 encryptor: Option<Box<dyn Encryptor>>,
182 ) -> Result<Arc<Self>> {
183 Self::open_with_cache(backend, encryptor, None, None)
184 }
185
186 /// Like [`open`](Self::open) but with custom cache capacity.
187 pub fn open_with_cache(
188 backend: Arc<dyn StorageBackend>,
189 encryptor: Option<Box<dyn Encryptor>>,
190 cache_capacity_bytes: Option<usize>,
191 prefetch_window_size: Option<u32>,
192 ) -> Result<Arc<Self>> {
193 // 1. Read header to determine compression type and dictionary
194 let header = Header::read_from_backend(backend.as_ref())?;
195
196 // 2. Validate version
197 if !check_version(header.version).is_compatible() {
198 return Err(Error::Format(compatibility_message(header.version)));
199 }
200
201 // 3. Load dictionary if present
202 let dictionary = header.load_dictionary(backend.as_ref())?;
203
204 // 4. Initialize compressor
205 let compressor = create_compressor(header.compression, None, dictionary.as_deref());
206
207 // 5. Recursively open with all settings
208 // Note: For now we pass None for parent loader; higher-level crates
209 // like hexz-store wrap this to provide a recursive parent loader.
210 Self::with_cache_and_loader(
211 backend,
212 compressor,
213 encryptor,
214 cache_capacity_bytes,
215 prefetch_window_size,
216 None,
217 )
218 }
219
220 /// Primary constructor for manual `Archive` initialization.
221 ///
222 /// This is the primary constructor used by `hexz-store` to supply a
223 /// configured compressor and backend.
224 pub fn new(
225 backend: Arc<dyn StorageBackend>,
226 compressor: Box<dyn Compressor>,
227 encryptor: Option<Box<dyn Encryptor>>,
228 ) -> Result<Arc<Self>> {
229 Self::with_cache(backend, compressor, encryptor, None, None)
230 }
231
232 /// Opens a Hexz archive with custom cache capacity and prefetching.
233 pub fn with_cache(
234 backend: Arc<dyn StorageBackend>,
235 compressor: Box<dyn Compressor>,
236 encryptor: Option<Box<dyn Encryptor>>,
237 cache_capacity_bytes: Option<usize>,
238 prefetch_window_size: Option<u32>,
239 ) -> Result<Arc<Self>> {
240 Self::with_cache_and_loader(
241 backend,
242 compressor,
243 encryptor,
244 cache_capacity_bytes,
245 prefetch_window_size,
246 None,
247 )
248 }
249
250 /// Like [`with_cache`](Self::with_cache) but accepts an optional parent loader.
251 ///
252 /// The `parent_loader` is used to resolve parent archives for thin archives.
253 /// If an archive declares parents but no loader is provided, blocks referring
254 /// to parents will return zeros.
255 pub fn with_cache_and_loader(
256 backend: Arc<dyn StorageBackend>,
257 compressor: Box<dyn Compressor>,
258 encryptor: Option<Box<dyn Encryptor>>,
259 cache_capacity_bytes: Option<usize>,
260 prefetch_window_size: Option<u32>,
261 parent_loader: Option<&ParentLoader>,
262 ) -> Result<Arc<Self>> {
263 // Read fixed header
264 let header = Header::read_from_backend(backend.as_ref())?;
265
266 // Verify encryption status match
267 if header.encryption.is_some() && encryptor.is_none() {
268 return Err(Error::Encryption(
269 "Archive is encrypted but no encryptor was provided".into(),
270 ));
271 }
272
273 // Read master index
274 let master = MasterIndex::read_from_backend(backend.as_ref(), header.index_offset)?;
275
276 // Load metadata if present
277 let metadata = if let (Some(offset), Some(length)) =
278 (header.metadata_offset, header.metadata_length)
279 {
280 Some(backend.read_exact(offset, length as usize)?.to_vec())
281 } else {
282 None
283 };
284
285 // Recursively load parent archives if a loader is provided.
286 let mut parents = Vec::new();
287 if let Some(loader) = parent_loader {
288 for parent_path in &header.parent_paths {
289 tracing::info!("Loading parent archive: {}", parent_path);
290 parents.push(loader(parent_path)?);
291 }
292 } else if !header.parent_paths.is_empty() {
293 tracing::warn!(
294 "Archive has {} parent path(s) but no parent_loader was provided; \
295 parent-reference blocks will not be resolvable.",
296 header.parent_paths.len()
297 );
298 }
299
300 // Initialize caches
301 let cache_l1 = Arc::new(BlockCache::with_capacity(
302 cache_capacity_bytes.unwrap_or(crate::cache::lru::DEFAULT_L1_CAPACITY),
303 ));
304 let page_cache = Arc::new(ShardedPageCache::default());
305 let buffer_pool = Arc::new(BufferPool::new(
306 crate::cache::buffer_pool::DEFAULT_POOL_SIZE,
307 ));
308
309 // Initialize prefetcher if window size > 0
310 let prefetcher = prefetch_window_size
311 .filter(|&w| w > 0)
312 .map(|w| Arc::new(Prefetcher::new(w)));
313
314 Ok(Arc::new(Self {
315 header,
316 metadata,
317 master,
318 backend,
319 compressor,
320 encryptor,
321 parents,
322 cache_l1,
323 page_cache,
324 _buffer_pool: buffer_pool,
325 prefetcher,
326 hash_index: Mutex::new(None),
327 }))
328 }
329
330 /// Returns the logical size of a stream in bytes.
331 ///
332 /// # Parameters
333 ///
334 /// - `stream`: The stream to query (Main or Auxiliary)
335 ///
336 /// # Returns
337 ///
338 /// The uncompressed, logical size of the stream. This is the size you would
339 /// get if you decompressed all blocks and concatenated them.
340 ///
341 /// # Examples
342 ///
343 /// ```ignore
344 /// use hexz_core::{Archive, ArchiveStream};
345 /// # use std::sync::Arc;
346 /// # fn example(archive: Arc<Archive>) {
347 /// let disk_bytes = archive.size(ArchiveStream::Main);
348 /// let mem_bytes = archive.size(ArchiveStream::Auxiliary);
349 ///
350 /// println!("Main: {} GB", disk_bytes / (1024 * 1024 * 1024));
351 /// println!("Auxiliary: {} MB", mem_bytes / (1024 * 1024));
352 /// # }
353 /// ```
354 pub const fn size(&self, stream: ArchiveStream) -> u64 {
355 match stream {
356 ArchiveStream::Main => self.master.main_size,
357 ArchiveStream::Auxiliary => self.master.auxiliary_size,
358 }
359 }
360
361 /// Returns the total number of prefetch operations spawned since this file was opened.
362 /// Returns 0 if prefetching is disabled.
363 pub fn prefetch_spawn_count(&self) -> u64 {
364 self.prefetcher.as_ref().map_or(0, |p| p.spawn_count())
365 }
366
367 /// Reads a single block from this archive.
368 pub fn read_block(
369 &self,
370 stream: ArchiveStream,
371 block_idx: u64,
372 info: &BlockInfo,
373 ) -> Result<Bytes> {
374 let fetch_result = self.fetch_raw_block(stream, block_idx, info)?;
375 match fetch_result {
376 FetchResult::Decompressed(d) => Ok(d),
377 FetchResult::Compressed(raw) => self.decompress_and_verify(&raw, block_idx, info),
378 }
379 }
380
381 /// Lazily builds and returns the hash index for this archive.
382 fn get_hash_index(&self) -> Result<Arc<HashIndex>> {
383 let mut index_guard = self
384 .hash_index
385 .lock()
386 .unwrap_or_else(std::sync::PoisonError::into_inner);
387 if let Some(index) = &*index_guard {
388 return Ok(index.clone());
389 }
390
391 tracing::debug!("Building hash index for archive...");
392 let mut map = HashMap::new();
393
394 // Index main stream
395 for page_entry in &self.master.main_pages {
396 let page = self.get_page(page_entry)?;
397 for (i, block) in page.blocks.iter().enumerate() {
398 if !block.is_sparse() && block.offset != BLOCK_OFFSET_PARENT {
399 let global_idx = page_entry.start_block + i as u64;
400 _ = map.insert(block.hash, (ArchiveStream::Main, global_idx, *block));
401 }
402 }
403 }
404
405 // Index auxiliary stream
406 for page_entry in &self.master.auxiliary_pages {
407 let page = self.get_page(page_entry)?;
408 for (i, block) in page.blocks.iter().enumerate() {
409 if !block.is_sparse() && block.offset != BLOCK_OFFSET_PARENT {
410 let global_idx = page_entry.start_block + i as u64;
411 _ = map.insert(block.hash, (ArchiveStream::Auxiliary, global_idx, *block));
412 }
413 }
414 }
415
416 let index = Arc::new(map);
417 *index_guard = Some(index.clone());
418 drop(index_guard);
419 Ok(index)
420 }
421
422 /// Finds a block in this archive by its hash.
423 pub fn get_block_by_hash(
424 &self,
425 hash: &[u8; 32],
426 ) -> Result<Option<(ArchiveStream, u64, BlockInfo)>> {
427 let index = self.get_hash_index()?;
428 Ok(index.get(hash).copied())
429 }
430
431 /// Iterates all non-sparse block hashes for the given stream.
432 ///
433 /// Used by `hexz-ops` to build a `ParentIndex` for cross-file deduplication
434 /// without requiring access to private fields.
435 pub fn iter_block_hashes(&self, stream: ArchiveStream) -> Result<Vec<[u8; 32]>> {
436 let pages = match stream {
437 ArchiveStream::Main => &self.master.main_pages,
438 ArchiveStream::Auxiliary => &self.master.auxiliary_pages,
439 };
440 let mut hashes = Vec::new();
441 for page_entry in pages {
442 let page: Arc<IndexPage> = self.get_page(page_entry)?;
443 for block_info in &page.blocks {
444 let info: &BlockInfo = block_info;
445 if !info.is_sparse() && info.hash != [0u8; 32] {
446 hashes.push(info.hash);
447 }
448 }
449 }
450 Ok(hashes)
451 }
452
453 /// Returns the block metadata for a given logical offset.
454 pub fn get_block_info(
455 &self,
456 stream: ArchiveStream,
457 offset: u64,
458 ) -> Result<Option<(u64, BlockInfo)>> {
459 let pages = match stream {
460 ArchiveStream::Main => &self.master.main_pages,
461 ArchiveStream::Auxiliary => &self.master.auxiliary_pages,
462 };
463
464 if pages.is_empty() {
465 return Ok(None);
466 }
467
468 let page_idx: usize = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
469 Ok(idx) => idx,
470 Err(idx) => idx.saturating_sub(1),
471 };
472
473 let page_entry = &pages[page_idx];
474 let page: Arc<IndexPage> = self.get_page(page_entry)?;
475 let mut block_logical_start = page_entry.start_logical;
476
477 for (i, block) in page.blocks.iter().enumerate() {
478 let block_end = block_logical_start + block.logical_len as u64;
479 if offset >= block_logical_start && offset < block_end {
480 let global_idx = page_entry.start_block + i as u64;
481 return Ok(Some((global_idx, *block)));
482 }
483 block_logical_start = block_end;
484 }
485
486 Ok(None)
487 }
488
489 /// Reads data from an archive stream at a given offset.
490 ///
491 /// This is the main read method for random access. It:
492 /// 1. Identifies which blocks overlap the requested range
493 /// 2. Fetches blocks from cache or decompresses from storage
494 /// 3. Handles thin archive fallback to parent
495 /// 4. Assembles the final buffer from block slices
496 ///
497 /// # Parameters
498 ///
499 /// - `stream`: Which stream to read from (Main or Auxiliary)
500 /// - `offset`: Logical byte offset in the stream
501 /// - `len`: Number of bytes to read
502 ///
503 /// # Returns
504 ///
505 /// A `Vec<u8>` containing the requested data. If the request extends beyond
506 /// the end of the stream, it is truncated. If it starts beyond the end,
507 /// an empty vector is returned.
508 ///
509 /// # Example
510 ///
511 /// ```ignore
512 /// # use std::sync::Arc;
513 /// # use hexz_core::{Archive, ArchiveStream};
514 /// # fn example(archive: Arc<Archive>) -> hexz_common::Result<()> {
515 /// // Read first 512 bytes of main stream
516 /// let data = archive.read_at(ArchiveStream::Main, 0, 512)?;
517 /// # Ok(())
518 /// # }
519 /// ```
520 pub fn read_at(
521 self: &Arc<Self>,
522 stream: ArchiveStream,
523 offset: u64,
524 len: usize,
525 ) -> Result<Vec<u8>> {
526 let stream_size = self.size(stream);
527 if offset >= stream_size {
528 return Ok(Vec::new());
529 }
530 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
531 let mut buffer = vec![0u8; actual_len];
532 self.read_at_into(stream, offset, &mut buffer)?;
533 Ok(buffer)
534 }
535
536 /// Reads into a provided buffer. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
537 pub fn read_at_into(
538 self: &Arc<Self>,
539 stream: ArchiveStream,
540 offset: u64,
541 buffer: &mut [u8],
542 ) -> Result<()> {
543 if buffer.is_empty() {
544 return Ok(());
545 }
546 // SAFETY: &mut [u8] and &mut [MaybeUninit<u8>] have identical layout (both
547 // are slices of single-byte types). Initialized u8 values are valid MaybeUninit<u8>.
548 // The borrow is derived from `buffer` so no aliasing occurs.
549 let uninit =
550 unsafe { &mut *(std::ptr::from_mut::<[u8]>(buffer) as *mut [MaybeUninit<u8>]) };
551 self.read_at_into_uninit(stream, offset, uninit)
552 }
553
554 /// Minimum number of local blocks to use the parallel decompression path.
555 /// Below this, serial decompression is usually faster due to thread sync overhead.
556 const PARALLEL_MIN_BLOCKS: usize = 4;
557
558 /// Collects work items for blocks that need decompression.
559 /// Handles zero blocks and parent-delegated blocks by writing to target immediately.
560 fn collect_work_items(
561 &self,
562 _stream: ArchiveStream,
563 pages: &[PageEntry],
564 page_idx: usize,
565 target: &mut [MaybeUninit<u8>],
566 offset: u64,
567 actual_len: usize,
568 ) -> Result<(Vec<WorkItem>, usize)> {
569 let mut work_items = Vec::new();
570 let mut current_pos = offset;
571 let mut remaining = actual_len;
572 let mut buf_offset = 0usize;
573
574 for page_entry in pages.iter().skip(page_idx) {
575 if remaining == 0 {
576 break;
577 }
578 // Stop if the current page starts after the end of our read range
579 if page_entry.start_logical > current_pos + remaining as u64 {
580 break;
581 }
582
583 let page = self.get_page(page_entry)?;
584 let mut block_logical_start = page_entry.start_logical;
585
586 for (i, block) in page.blocks.iter().enumerate() {
587 if remaining == 0 {
588 break;
589 }
590 let block_end = block_logical_start + block.logical_len as u64;
591
592 // Check if this block overlaps with our read range
593 if block_end > current_pos {
594 let offset_in_block = (current_pos - block_logical_start) as usize;
595 let to_copy = std::cmp::min(
596 remaining,
597 (block.logical_len as usize).saturating_sub(offset_in_block),
598 );
599
600 // CASE 1: Zero block (sparse)
601 if block.offset == 0 && block.length == 0 {
602 Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
603 }
604 // CASE 2: Parent block (delegation)
605 else if block.offset == BLOCK_OFFSET_PARENT {
606 let mut found = false;
607 for parent in &self.parents {
608 if let Some((p_stream, p_idx, p_info)) =
609 parent.get_block_by_hash(&block.hash)?
610 {
611 let data = parent.read_block(p_stream, p_idx, &p_info)?;
612
613 // Copy the requested range from the parent block
614 let src = &data[offset_in_block..offset_in_block + to_copy];
615 // SAFETY: distinct ranges
616 unsafe {
617 let dst_ptr = target.as_mut_ptr().add(buf_offset).cast::<u8>();
618 ptr::copy_nonoverlapping(src.as_ptr(), dst_ptr, to_copy);
619 }
620
621 found = true;
622 break;
623 }
624 }
625 if !found {
626 Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
627 }
628 }
629 // CASE 3: Data block (local)
630 else {
631 let global_idx = page_entry.start_block + i as u64;
632 work_items.push((global_idx, *block, buf_offset, offset_in_block, to_copy));
633 }
634
635 current_pos += to_copy as u64;
636 remaining -= to_copy;
637 buf_offset += to_copy;
638 }
639 block_logical_start += block.logical_len as u64;
640 }
641 }
642
643 Ok((work_items, buf_offset))
644 }
645
646 /// Executes parallel decompression for multiple blocks.
647 /// Uses rayon to decompress blocks concurrently.
648 fn execute_parallel_decompression(
649 self: &Arc<Self>,
650 stream: ArchiveStream,
651 work_items: &[WorkItem],
652 target: &mut [MaybeUninit<u8>],
653 ) -> Result<()> {
654 let target_ptr = target.as_mut_ptr() as usize;
655 let results: Vec<Result<()>> = work_items
656 .par_iter()
657 .map(
658 |&(block_idx, ref info, buf_offset, offset_in_block, to_copy)| {
659 // Fetch and decompress
660 let fetch_result = self.fetch_raw_block(stream, block_idx, info)?;
661 let data = match fetch_result {
662 FetchResult::Decompressed(d) => d,
663 FetchResult::Compressed(raw) => {
664 self.decompress_and_verify(&raw, block_idx, info)?
665 }
666 };
667
668 // Copy to target
669 let src = &data[offset_in_block..offset_in_block + to_copy];
670 // SAFETY: We are writing to a distinct, non-overlapping range of the target buffer
671 // for each work item. buf_offset and to_copy ensure no bounds are exceeded.
672 unsafe {
673 let dst_ptr = (target_ptr + buf_offset) as *mut u8;
674 ptr::copy_nonoverlapping(src.as_ptr(), dst_ptr, to_copy);
675 }
676 Ok(())
677 },
678 )
679 .collect();
680
681 // Propagate the first error encountered, if any
682 for r in results {
683 r?;
684 }
685 Ok(())
686 }
687
688 /// Executes serial decompression for a small number of blocks.
689 fn execute_serial_decompression(
690 &self,
691 stream: ArchiveStream,
692 work_items: &[WorkItem],
693 target: &mut [MaybeUninit<u8>],
694 ) -> Result<()> {
695 for &(block_idx, ref info, buf_offset, offset_in_block, to_copy) in work_items {
696 let fetch_result = self.fetch_raw_block(stream, block_idx, info)?;
697 let data = match fetch_result {
698 FetchResult::Decompressed(d) => d,
699 FetchResult::Compressed(raw) => {
700 self.decompress_and_verify(&raw, block_idx, info)?
701 }
702 };
703
704 let src = &data[offset_in_block..offset_in_block + to_copy];
705 // SAFETY: Serial execution, distinct ranges.
706 unsafe {
707 let dst_ptr = target.as_mut_ptr().add(buf_offset).cast::<u8>();
708 ptr::copy_nonoverlapping(src.as_ptr(), dst_ptr, to_copy);
709 }
710 }
711 Ok(())
712 }
713
714 /// Fills uninitialized memory with zeros.
715 fn zero_fill_uninit(buffer: &mut [MaybeUninit<u8>]) {
716 let mut remaining = buffer.len();
717 let mut offset = 0;
718 while remaining > 0 {
719 let to_copy = std::cmp::min(remaining, ZEROS_64K.len());
720 // SAFETY: `ZEROS_64K` is a static initialized array; `buffer` is a valid mutable
721 // slice of `MaybeUninit<u8>`. `offset + to_copy <= buffer.len()` is maintained by
722 // the loop, and `to_copy <= ZEROS_64K.len()`. Writing initialized bytes into
723 // `MaybeUninit<u8>` is always valid since `u8` has no invalid bit patterns.
724 unsafe {
725 ptr::copy_nonoverlapping(
726 ZEROS_64K.as_ptr(),
727 buffer.as_mut_ptr().add(offset).cast::<u8>(),
728 to_copy,
729 );
730 }
731 remaining -= to_copy;
732 offset += to_copy;
733 }
734 }
735
736 /// Writes into uninitialized memory. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
737 ///
738 /// **On error:** The buffer contents are undefined (possibly partially written).
739 pub fn read_at_into_uninit(
740 self: &Arc<Self>,
741 stream: ArchiveStream,
742 offset: u64,
743 buffer: &mut [MaybeUninit<u8>],
744 ) -> Result<()> {
745 self.read_at_uninit_inner(stream, offset, buffer, false)
746 }
747
748 /// Inner implementation of [`read_at_into_uninit`](Self::read_at_into_uninit).
749 /// The `is_prefetch` flag prevents recursive prefetch thread spawning:
750 /// when `true`, the prefetch block is skipped to avoid unbounded thread creation.
751 fn read_at_uninit_inner(
752 self: &Arc<Self>,
753 stream: ArchiveStream,
754 offset: u64,
755 buffer: &mut [MaybeUninit<u8>],
756 is_prefetch: bool,
757 ) -> Result<()> {
758 // Early validation
759 let len = buffer.len();
760 if len == 0 {
761 return Ok(());
762 }
763
764 let stream_size = self.size(stream);
765 if offset >= stream_size {
766 Self::zero_fill_uninit(buffer);
767 return Ok(());
768 }
769
770 // Calculate actual read length and zero-fill suffix if needed
771 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
772 if actual_len < len {
773 Self::zero_fill_uninit(&mut buffer[actual_len..]);
774 }
775
776 let target = &mut buffer[0..actual_len];
777
778 // Get page list for stream
779 let pages = match stream {
780 ArchiveStream::Main => &self.master.main_pages,
781 ArchiveStream::Auxiliary => &self.master.auxiliary_pages,
782 };
783
784 // Delegate to parent if no index pages
785 if pages.is_empty() {
786 for parent in &self.parents {
787 if parent.get_block_info(stream, offset)?.is_some() {
788 return parent.read_at_into_uninit(stream, offset, target);
789 }
790 }
791 Self::zero_fill_uninit(target);
792 return Ok(());
793 }
794
795 // Find starting page index
796 let page_idx: usize = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
797 Ok(idx) => idx,
798 Err(idx) => idx.saturating_sub(1),
799 };
800
801 // Collect work items (handles parent blocks, zero blocks, and queues regular blocks)
802 let (work_items, buf_offset) =
803 self.collect_work_items(stream, pages, page_idx, target, offset, actual_len)?;
804
805 // Choose parallel or serial decompression based on work item count
806 let work_items_slice: &[WorkItem] = &work_items;
807 if work_items_slice.len() >= Self::PARALLEL_MIN_BLOCKS {
808 self.execute_parallel_decompression(stream, work_items_slice, target)?;
809 } else {
810 self.execute_serial_decompression(stream, work_items_slice, target)?;
811 }
812
813 // Handle any remaining unprocessed data
814 let remaining = actual_len - buf_offset;
815 if remaining > 0 {
816 let current_pos = offset + buf_offset as u64;
817 let mut found = false;
818 for parent in &self.parents {
819 if parent.get_block_info(stream, current_pos)?.is_some() {
820 parent.read_at_into_uninit(stream, current_pos, &mut target[buf_offset..])?;
821 found = true;
822 break;
823 }
824 }
825 if !found {
826 Self::zero_fill_uninit(&mut target[buf_offset..]);
827 }
828 }
829
830 // Trigger prefetch for next sequential blocks if enabled.
831 // Guards:
832 // 1. `is_prefetch` prevents recursive spawning (prefetch thread spawning another)
833 // 2. `try_start()` limits to one in-flight prefetch at a time, preventing
834 // unbounded thread creation under rapid sequential reads
835 if let Some(prefetcher) = &self.prefetcher {
836 if !is_prefetch && !work_items.is_empty() && prefetcher.try_start() {
837 let next_offset = offset + actual_len as u64;
838 let prefetch_len = (self.header.block_size * 4) as usize;
839 let snap = Arc::clone(self);
840 let stream_copy = stream;
841 rayon::spawn(move || {
842 let _ = snap.warm_blocks(stream_copy, next_offset, prefetch_len);
843 // Release the in-flight guard so the next read can prefetch
844 if let Some(pf) = &snap.prefetcher {
845 pf.clear_in_flight();
846 }
847 });
848 }
849 }
850
851 Ok(())
852 }
853
854 /// Warms the block cache for the given byte range without allocating a target buffer.
855 ///
856 /// Unlike [`read_at_into_uninit`](Self::read_at_into_uninit), this method only fetches,
857 /// decompresses, and inserts blocks into the L1 cache. It skips blocks that are already
858 /// cached, zero-length, or parent-delegated. No output buffer is allocated or written to.
859 ///
860 /// Used by the prefetcher to reduce overhead: the old path allocated a throwaway buffer
861 /// of `block_size * 4` bytes and copied decompressed data into it, only to discard it.
862 fn warm_blocks(&self, stream: ArchiveStream, offset: u64, len: usize) -> Result<()> {
863 if len == 0 {
864 return Ok(());
865 }
866 let stream_size = self.size(stream);
867 if offset >= stream_size {
868 return Ok(());
869 }
870 let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
871
872 let pages = match stream {
873 ArchiveStream::Main => &self.master.main_pages,
874 ArchiveStream::Auxiliary => &self.master.auxiliary_pages,
875 };
876 if pages.is_empty() {
877 return Ok(());
878 }
879
880 let page_idx: usize = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
881 Ok(idx) => idx,
882 Err(idx) => idx.saturating_sub(1),
883 };
884
885 let mut current_pos = offset;
886 let mut remaining = actual_len;
887
888 for page_entry in pages.iter().skip(page_idx) {
889 if remaining == 0 {
890 break;
891 }
892 if page_entry.start_logical > current_pos + remaining as u64 {
893 break;
894 }
895
896 let page: Arc<IndexPage> = self.get_page(page_entry)?;
897 let mut block_logical_start = page_entry.start_logical;
898
899 for (i, block) in page.blocks.iter().enumerate() {
900 if remaining == 0 {
901 break;
902 }
903 let block_end = block_logical_start + block.logical_len as u64;
904
905 if block_end > current_pos {
906 let offset_in_block = (current_pos - block_logical_start) as usize;
907 let to_advance = std::cmp::min(
908 remaining,
909 (block.logical_len as usize).saturating_sub(offset_in_block),
910 );
911
912 // Only warm regular blocks (skip parent-delegated and zero blocks).
913 // fetch_raw_block handles the cache check internally — on a hit it
914 // returns Decompressed which we simply ignore via the Compressed match.
915 if block.offset != BLOCK_OFFSET_PARENT && block.length > 0 {
916 let global_idx = page_entry.start_block + i as u64;
917 if let Ok(FetchResult::Compressed(raw)) =
918 self.fetch_raw_block(stream, global_idx, block)
919 {
920 if let Ok(data) = self.decompress_and_verify(&raw, global_idx, block) {
921 self.cache_l1.insert(stream, global_idx, data);
922 }
923 }
924 }
925
926 current_pos += to_advance as u64;
927 remaining -= to_advance;
928 }
929 block_logical_start += block.logical_len as u64;
930 }
931 }
932
933 Ok(())
934 }
935
936 /// Like [`read_at_into_uninit`](Self::read_at_into_uninit) but accepts `&mut [u8]`. Use from FFI (e.g. Python).
937 #[inline]
938 pub fn read_at_into_uninit_bytes(
939 self: &Arc<Self>,
940 stream: ArchiveStream,
941 offset: u64,
942 buf: &mut [u8],
943 ) -> Result<()> {
944 if buf.is_empty() {
945 return Ok(());
946 }
947 // SAFETY: &mut [u8] and &mut [MaybeUninit<u8>] have identical layout (both
948 // are slices of single-byte types). Initialized u8 values are valid MaybeUninit<u8>.
949 // The borrow is derived from `buf` so no aliasing occurs.
950 let uninit = unsafe { &mut *(std::ptr::from_mut::<[u8]>(buf) as *mut [MaybeUninit<u8>]) };
951 self.read_at_into_uninit(stream, offset, uninit)
952 }
953
954 /// Fetches an index page from cache or storage.
955 ///
956 /// Index pages map logical offsets to physical block locations. This method
957 /// maintains an LRU cache to avoid repeated deserialization.
958 ///
959 /// # Parameters
960 ///
961 /// - `entry`: Page metadata from master index
962 ///
963 /// # Returns
964 ///
965 /// A shared reference to the deserialized index page.
966 ///
967 /// # Thread Safety
968 ///
969 /// This method acquires a lock on the page cache only for cache lookup and insertion.
970 /// I/O and deserialization are performed without holding the lock to avoid blocking
971 /// other threads during cache misses.
972 pub(crate) fn get_page(&self, entry: &PageEntry) -> Result<Arc<IndexPage>> {
973 // Fast path: check sharded cache
974 if let Some(p) = self.page_cache.get(entry.offset) {
975 return Ok(p);
976 }
977
978 // Slow path: I/O and deserialization without holding any lock
979 let bytes = self
980 .backend
981 .read_exact(entry.offset, entry.length as usize)?;
982 let page: IndexPage = bincode::deserialize(&bytes)?;
983 let arc = Arc::new(page);
984
985 // Check again in case another thread inserted while we were doing I/O
986 if let Some(p) = self.page_cache.get(entry.offset) {
987 return Ok(p);
988 }
989 self.page_cache.insert(entry.offset, arc.clone());
990
991 Ok(arc)
992 }
993
994 /// Fetches raw compressed block data from cache or storage.
995 ///
996 /// This is the I/O portion of block resolution, separated to enable parallel I/O.
997 /// It:
998 /// 1. Checks the block cache
999 /// 2. Handles zero-length blocks
1000 /// 3. Reads raw compressed data from backend
1001 ///
1002 /// # Parameters
1003 ///
1004 /// - `stream`: Stream identifier (for cache key)
1005 /// - `block_idx`: Global block index
1006 /// - `info`: Block metadata (offset, length)
1007 ///
1008 /// # Returns
1009 ///
1010 /// Raw block data (potentially compressed/encrypted) or cached decompressed data.
1011 fn fetch_raw_block(
1012 &self,
1013 stream: ArchiveStream,
1014 block_idx: u64,
1015 info: &BlockInfo,
1016 ) -> Result<FetchResult> {
1017 // Check cache first - return decompressed data if available
1018 if let Some(data) = self.cache_l1.get(stream, block_idx) {
1019 return Ok(FetchResult::Decompressed(data));
1020 }
1021
1022 // Handle zero blocks
1023 if info.offset == 0 && info.length == 0 {
1024 // Check if we can use the shared 64K zero block
1025 if info.logical_len == DEFAULT_BLOCK_SIZE {
1026 return Ok(FetchResult::Decompressed(Bytes::from_static(&ZEROS_64K)));
1027 }
1028 return Ok(FetchResult::Decompressed(Bytes::from(vec![
1029 0u8;
1030 info.logical_len
1031 as usize
1032 ])));
1033 }
1034
1035 // Read raw data from backend
1036 let raw = self.backend.read_exact(info.offset, info.length as usize)?;
1037 Ok(FetchResult::Compressed(raw))
1038 }
1039
1040 /// Decompresses and optionally decrypts a block.
1041 /// Validates the block checksum after decompression/decryption.
1042 fn decompress_and_verify(&self, raw: &[u8], block_idx: u64, info: &BlockInfo) -> Result<Bytes> {
1043 // Verify checksum of final data (compressed + encrypted)
1044 let actual_checksum = crc32_hash(raw);
1045 if actual_checksum != info.checksum {
1046 return Err(Error::Format(format!(
1047 "Block {} checksum mismatch: expected {:08x}, got {:08x}",
1048 block_idx, info.checksum, actual_checksum
1049 )));
1050 }
1051
1052 let mut out = vec![0u8; info.logical_len as usize];
1053
1054 if let Some(ref enc) = self.encryptor {
1055 let compressed = enc.decrypt(raw, block_idx)?;
1056 _ = self.compressor.decompress_into(&compressed, &mut out)?;
1057 } else {
1058 _ = self.compressor.decompress_into(raw, &mut out)?;
1059 }
1060
1061 Ok(Bytes::from(out))
1062 }
1063}