Skip to main content

hexz_core/api/
file.rs

1//! High-level archive file API and logical stream types.
2
3use crate::algo::compression::{Compressor, create_compressor};
4use crate::algo::encryption::Encryptor;
5use crate::cache::buffer_pool::BufferPool;
6use crate::cache::lru::{BlockCache, ShardedPageCache};
7use crate::cache::prefetch::Prefetcher;
8use crate::format::header::Header;
9use crate::format::index::{BlockInfo, IndexPage, MasterIndex, PageEntry};
10use crate::format::version::{check_version, compatibility_message};
11use crate::store::StorageBackend;
12use bytes::Bytes;
13use crc32fast::hash as crc32_hash;
14use std::collections::HashMap;
15use std::mem::MaybeUninit;
16use std::ptr;
17use std::sync::{Arc, Mutex};
18
19use hexz_common::constants::{BLOCK_OFFSET_PARENT, DEFAULT_BLOCK_SIZE};
20use hexz_common::{Error, Result};
21use rayon::prelude::*;
22
23/// A factory function that opens a parent archive by path.
24///
25/// Provided by the caller of [`Archive::with_cache_and_loader`] so that the
26/// core read API has no hard dependency on any specific storage backend
27/// implementation. Storage crates supply a concrete loader; callers that
28/// know parents cannot exist may pass `None`.
29pub type ParentLoader = Box<dyn Fn(&str) -> Result<Arc<Archive>> + Send + Sync>;
30
31/// Shared zero block for the default block size to avoid allocating when returning zero blocks.
32static ZEROS_64K: [u8; DEFAULT_BLOCK_SIZE as usize] = [0u8; DEFAULT_BLOCK_SIZE as usize];
33
34/// A map from block hash to its location in the archive.
35type HashIndex = HashMap<[u8; 32], (ArchiveStream, u64, BlockInfo)>;
36
37/// Work item for block decompression: (`block_idx`, info, `buf_offset`, `offset_in_block`, `to_copy`)
38type WorkItem = (u64, BlockInfo, usize, usize, usize);
39
40/// Result of fetching a block from cache or storage.
41///
42/// Eliminates TOCTOU races by tracking data state at fetch time rather than
43/// re-checking the cache later (which can give a different answer if a
44/// background prefetch thread modifies the cache between check and use).
45enum FetchResult {
46    /// Data is already decompressed (came from L1 cache or is a zero block).
47    Decompressed(Bytes),
48    /// Data is raw compressed bytes from storage (needs decompression).
49    Compressed(Bytes),
50}
51
52/// Logical stream identifier for multi-stream archives.
53///
54/// Hexz archives can store independent data streams:
55/// - **Main**: Primary data stream (e.g. file system image, main dataset)
56/// - **Auxiliary**: Optional secondary data
57///
58/// # Example
59///
60/// ```ignore
61/// use hexz_core::{Archive, ArchiveStream};
62/// # use std::sync::Arc;
63/// # fn example(snapshot: Arc<Archive>) -> Result<(), Box<dyn std::error::Error>> {
64/// // Read 4KB from main stream
65/// let data = snapshot.read_at(ArchiveStream::Main, 0, 4096)?;
66///
67/// // Read 4KB from auxiliary stream (if present)
68/// let aux = snapshot.read_at(ArchiveStream::Auxiliary, 0, 4096)?;
69/// # Ok(())
70/// # }
71/// ```
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73#[repr(u8)]
74pub enum ArchiveStream {
75    /// Main data stream
76    Main = 0,
77    /// Auxiliary data stream
78    Auxiliary = 1,
79}
80
81/// Read-only interface for accessing Hexz archive data.
82///
83/// `Archive` is the primary API for reading compressed, block-indexed archives.
84/// It handles:
85/// - **Logical-to-Physical Mapping**: Translates byte offsets to blocks via index pages.
86/// - **Compression**: Transparent decompression using LZ4 or Zstandard.
87/// - **Encryption**: Transparent decryption using AES-256-GCM.
88/// - **Caching**: Two-level caching (L1 decompressed blocks, L2 index pages).
89/// - **Thin Archives**: Resolves missing blocks from parent archives.
90/// - **Prefetching**: Asynchronous background loading of sequential blocks.
91///
92/// # Thread Safety
93///
94/// `Archive` is `Send + Sync`. All methods are thread-safe and utilize sharded
95/// locks to minimize contention during concurrent reads.
96pub struct Archive {
97    /// Archive metadata (sizes, compression, encryption settings)
98    pub header: Header,
99
100    /// Decoded metadata bytes from the metadata section
101    pub metadata: Option<Vec<u8>>,
102
103    /// Master index containing top-level page entries
104    pub(crate) master: MasterIndex,
105
106    /// Storage backend for reading raw archive data
107    backend: Arc<dyn StorageBackend>,
108
109    /// Compression algorithm (LZ4 or Zstandard)
110    compressor: Box<dyn Compressor>,
111
112    /// Optional encryption (AES-256-GCM)
113    encryptor: Option<Box<dyn Encryptor>>,
114
115    /// Optional parent archive for thin (incremental) archives.
116    /// When a block's offset is `BLOCK_OFFSET_PARENT`, data is fetched from parent.
117    parents: Vec<Arc<Self>>,
118
119    /// L1 Cache: Decompressed data blocks (sharded for concurrency)
120    cache_l1: Arc<BlockCache>,
121
122    /// L2 Cache: Deserialized index pages (sharded for concurrency)
123    page_cache: Arc<ShardedPageCache>,
124
125    /// Buffer pool for reusing decompression buffers (constructed for future use)
126    _buffer_pool: Arc<BufferPool>,
127
128    /// Sequential prefetch controller
129    prefetcher: Option<Arc<Prefetcher>>,
130
131    /// Lazy hash index for resolving `ParentRef` by content rather than offset.
132    hash_index: Mutex<Option<Arc<HashIndex>>>,
133}
134
135impl std::fmt::Debug for Archive {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        f.debug_struct("Archive")
138            .field("version", &self.header.version)
139            .field("block_size", &self.header.block_size)
140            .field("compression", &self.header.compression)
141            .field("encrypted", &self.header.encryption.is_some())
142            .field("parents", &self.parents.len())
143            .finish_non_exhaustive()
144    }
145}
146
147impl Archive {
148    /// Opens a Hexz archive with default cache settings.
149    ///
150    /// This is the primary constructor for `Archive`. It:
151    /// 1. Reads and validates the archive header (magic bytes, version)
152    /// 2. Deserializes the master index
153    /// 3. Recursively loads parent archives (for thin archives)
154    /// 4. Initializes block and page caches
155    ///
156    /// # Parameters
157    ///
158    /// - `backend`: Implementation of [`StorageBackend`] (Local file, S3, etc.)
159    /// - `encryptor`: Optional decryptor (required if archive is encrypted)
160    ///
161    /// # Errors
162    ///
163    /// - `Error::Io`: Backend I/O failure or file not found.
164    /// - `Error::Format`: Invalid magic bytes or corrupted header.
165    /// - `Error::Encryption`: Missing or incorrect encryption key.
166    ///
167    /// # Example
168    ///
169    /// ```ignore
170    /// # use std::sync::Arc;
171    /// # use hexz_core::Archive;
172    /// # use hexz_store::local::FileBackend;
173    /// let backend = Arc::new(FileBackend::new("data.hxz".as_ref())?);
174    /// let archive = Archive::open(backend, None)?;
175    ///
176    /// println!("Main size: {} bytes", archive.size(ArchiveStream::Main));
177    /// # Ok::<(), Box<dyn std::error::Error>>(())
178    /// ```
179    pub fn open(
180        backend: Arc<dyn StorageBackend>,
181        encryptor: Option<Box<dyn Encryptor>>,
182    ) -> Result<Arc<Self>> {
183        Self::open_with_cache(backend, encryptor, None, None)
184    }
185
186    /// Like [`open`](Self::open) but with custom cache capacity.
187    pub fn open_with_cache(
188        backend: Arc<dyn StorageBackend>,
189        encryptor: Option<Box<dyn Encryptor>>,
190        cache_capacity_bytes: Option<usize>,
191        prefetch_window_size: Option<u32>,
192    ) -> Result<Arc<Self>> {
193        // 1. Read header to determine compression type and dictionary
194        let header = Header::read_from_backend(backend.as_ref())?;
195
196        // 2. Validate version
197        if !check_version(header.version).is_compatible() {
198            return Err(Error::Format(compatibility_message(header.version)));
199        }
200
201        // 3. Load dictionary if present
202        let dictionary = header.load_dictionary(backend.as_ref())?;
203
204        // 4. Initialize compressor
205        let compressor = create_compressor(header.compression, None, dictionary.as_deref());
206
207        // 5. Recursively open with all settings
208        // Note: For now we pass None for parent loader; higher-level crates
209        // like hexz-store wrap this to provide a recursive parent loader.
210        Self::with_cache_and_loader(
211            backend,
212            compressor,
213            encryptor,
214            cache_capacity_bytes,
215            prefetch_window_size,
216            None,
217        )
218    }
219
220    /// Primary constructor for manual `Archive` initialization.
221    ///
222    /// This is the primary constructor used by `hexz-store` to supply a
223    /// configured compressor and backend.
224    pub fn new(
225        backend: Arc<dyn StorageBackend>,
226        compressor: Box<dyn Compressor>,
227        encryptor: Option<Box<dyn Encryptor>>,
228    ) -> Result<Arc<Self>> {
229        Self::with_cache(backend, compressor, encryptor, None, None)
230    }
231
232    /// Opens a Hexz archive with custom cache capacity and prefetching.
233    pub fn with_cache(
234        backend: Arc<dyn StorageBackend>,
235        compressor: Box<dyn Compressor>,
236        encryptor: Option<Box<dyn Encryptor>>,
237        cache_capacity_bytes: Option<usize>,
238        prefetch_window_size: Option<u32>,
239    ) -> Result<Arc<Self>> {
240        Self::with_cache_and_loader(
241            backend,
242            compressor,
243            encryptor,
244            cache_capacity_bytes,
245            prefetch_window_size,
246            None,
247        )
248    }
249
250    /// Like [`with_cache`](Self::with_cache) but accepts an optional parent loader.
251    ///
252    /// The `parent_loader` is used to resolve parent archives for thin archives.
253    /// If an archive declares parents but no loader is provided, blocks referring
254    /// to parents will return zeros.
255    pub fn with_cache_and_loader(
256        backend: Arc<dyn StorageBackend>,
257        compressor: Box<dyn Compressor>,
258        encryptor: Option<Box<dyn Encryptor>>,
259        cache_capacity_bytes: Option<usize>,
260        prefetch_window_size: Option<u32>,
261        parent_loader: Option<&ParentLoader>,
262    ) -> Result<Arc<Self>> {
263        // Read fixed header
264        let header = Header::read_from_backend(backend.as_ref())?;
265
266        // Verify encryption status match
267        if header.encryption.is_some() && encryptor.is_none() {
268            return Err(Error::Encryption(
269                "Archive is encrypted but no encryptor was provided".into(),
270            ));
271        }
272
273        // Read master index
274        let master = MasterIndex::read_from_backend(backend.as_ref(), header.index_offset)?;
275
276        // Load metadata if present
277        let metadata = if let (Some(offset), Some(length)) =
278            (header.metadata_offset, header.metadata_length)
279        {
280            Some(backend.read_exact(offset, length as usize)?.to_vec())
281        } else {
282            None
283        };
284
285        // Recursively load parent archives if a loader is provided.
286        let mut parents = Vec::new();
287        if let Some(loader) = parent_loader {
288            for parent_path in &header.parent_paths {
289                tracing::info!("Loading parent archive: {}", parent_path);
290                parents.push(loader(parent_path)?);
291            }
292        } else if !header.parent_paths.is_empty() {
293            tracing::warn!(
294                "Archive has {} parent path(s) but no parent_loader was provided; \
295                 parent-reference blocks will not be resolvable.",
296                header.parent_paths.len()
297            );
298        }
299
300        // Initialize caches
301        let cache_l1 = Arc::new(BlockCache::with_capacity(
302            cache_capacity_bytes.unwrap_or(crate::cache::lru::DEFAULT_L1_CAPACITY),
303        ));
304        let page_cache = Arc::new(ShardedPageCache::default());
305        let buffer_pool = Arc::new(BufferPool::new(
306            crate::cache::buffer_pool::DEFAULT_POOL_SIZE,
307        ));
308
309        // Initialize prefetcher if window size > 0
310        let prefetcher = prefetch_window_size
311            .filter(|&w| w > 0)
312            .map(|w| Arc::new(Prefetcher::new(w)));
313
314        Ok(Arc::new(Self {
315            header,
316            metadata,
317            master,
318            backend,
319            compressor,
320            encryptor,
321            parents,
322            cache_l1,
323            page_cache,
324            _buffer_pool: buffer_pool,
325            prefetcher,
326            hash_index: Mutex::new(None),
327        }))
328    }
329
330    /// Returns the logical size of a stream in bytes.
331    ///
332    /// # Parameters
333    ///
334    /// - `stream`: The stream to query (Main or Auxiliary)
335    ///
336    /// # Returns
337    ///
338    /// The uncompressed, logical size of the stream. This is the size you would
339    /// get if you decompressed all blocks and concatenated them.
340    ///
341    /// # Examples
342    ///
343    /// ```ignore
344    /// use hexz_core::{Archive, ArchiveStream};
345    /// # use std::sync::Arc;
346    /// # fn example(archive: Arc<Archive>) {
347    /// let disk_bytes = archive.size(ArchiveStream::Main);
348    /// let mem_bytes = archive.size(ArchiveStream::Auxiliary);
349    ///
350    /// println!("Main: {} GB", disk_bytes / (1024 * 1024 * 1024));
351    /// println!("Auxiliary: {} MB", mem_bytes / (1024 * 1024));
352    /// # }
353    /// ```
354    pub const fn size(&self, stream: ArchiveStream) -> u64 {
355        match stream {
356            ArchiveStream::Main => self.master.main_size,
357            ArchiveStream::Auxiliary => self.master.auxiliary_size,
358        }
359    }
360
361    /// Returns the total number of prefetch operations spawned since this file was opened.
362    /// Returns 0 if prefetching is disabled.
363    pub fn prefetch_spawn_count(&self) -> u64 {
364        self.prefetcher.as_ref().map_or(0, |p| p.spawn_count())
365    }
366
367    /// Reads a single block from this archive.
368    pub fn read_block(
369        &self,
370        stream: ArchiveStream,
371        block_idx: u64,
372        info: &BlockInfo,
373    ) -> Result<Bytes> {
374        let fetch_result = self.fetch_raw_block(stream, block_idx, info)?;
375        match fetch_result {
376            FetchResult::Decompressed(d) => Ok(d),
377            FetchResult::Compressed(raw) => self.decompress_and_verify(&raw, block_idx, info),
378        }
379    }
380
381    /// Lazily builds and returns the hash index for this archive.
382    fn get_hash_index(&self) -> Result<Arc<HashIndex>> {
383        let mut index_guard = self
384            .hash_index
385            .lock()
386            .unwrap_or_else(std::sync::PoisonError::into_inner);
387        if let Some(index) = &*index_guard {
388            return Ok(index.clone());
389        }
390
391        tracing::debug!("Building hash index for archive...");
392        let mut map = HashMap::new();
393
394        // Index main stream
395        for page_entry in &self.master.main_pages {
396            let page = self.get_page(page_entry)?;
397            for (i, block) in page.blocks.iter().enumerate() {
398                if !block.is_sparse() && block.offset != BLOCK_OFFSET_PARENT {
399                    let global_idx = page_entry.start_block + i as u64;
400                    _ = map.insert(block.hash, (ArchiveStream::Main, global_idx, *block));
401                }
402            }
403        }
404
405        // Index auxiliary stream
406        for page_entry in &self.master.auxiliary_pages {
407            let page = self.get_page(page_entry)?;
408            for (i, block) in page.blocks.iter().enumerate() {
409                if !block.is_sparse() && block.offset != BLOCK_OFFSET_PARENT {
410                    let global_idx = page_entry.start_block + i as u64;
411                    _ = map.insert(block.hash, (ArchiveStream::Auxiliary, global_idx, *block));
412                }
413            }
414        }
415
416        let index = Arc::new(map);
417        *index_guard = Some(index.clone());
418        drop(index_guard);
419        Ok(index)
420    }
421
422    /// Finds a block in this archive by its hash.
423    pub fn get_block_by_hash(
424        &self,
425        hash: &[u8; 32],
426    ) -> Result<Option<(ArchiveStream, u64, BlockInfo)>> {
427        let index = self.get_hash_index()?;
428        Ok(index.get(hash).copied())
429    }
430
431    /// Iterates all non-sparse block hashes for the given stream.
432    ///
433    /// Used by `hexz-ops` to build a `ParentIndex` for cross-file deduplication
434    /// without requiring access to private fields.
435    pub fn iter_block_hashes(&self, stream: ArchiveStream) -> Result<Vec<[u8; 32]>> {
436        let pages = match stream {
437            ArchiveStream::Main => &self.master.main_pages,
438            ArchiveStream::Auxiliary => &self.master.auxiliary_pages,
439        };
440        let mut hashes = Vec::new();
441        for page_entry in pages {
442            let page: Arc<IndexPage> = self.get_page(page_entry)?;
443            for block_info in &page.blocks {
444                let info: &BlockInfo = block_info;
445                if !info.is_sparse() && info.hash != [0u8; 32] {
446                    hashes.push(info.hash);
447                }
448            }
449        }
450        Ok(hashes)
451    }
452
453    /// Returns the block metadata for a given logical offset.
454    pub fn get_block_info(
455        &self,
456        stream: ArchiveStream,
457        offset: u64,
458    ) -> Result<Option<(u64, BlockInfo)>> {
459        let pages = match stream {
460            ArchiveStream::Main => &self.master.main_pages,
461            ArchiveStream::Auxiliary => &self.master.auxiliary_pages,
462        };
463
464        if pages.is_empty() {
465            return Ok(None);
466        }
467
468        let page_idx: usize = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
469            Ok(idx) => idx,
470            Err(idx) => idx.saturating_sub(1),
471        };
472
473        let page_entry = &pages[page_idx];
474        let page: Arc<IndexPage> = self.get_page(page_entry)?;
475        let mut block_logical_start = page_entry.start_logical;
476
477        for (i, block) in page.blocks.iter().enumerate() {
478            let block_end = block_logical_start + block.logical_len as u64;
479            if offset >= block_logical_start && offset < block_end {
480                let global_idx = page_entry.start_block + i as u64;
481                return Ok(Some((global_idx, *block)));
482            }
483            block_logical_start = block_end;
484        }
485
486        Ok(None)
487    }
488
489    /// Reads data from an archive stream at a given offset.
490    ///
491    /// This is the main read method for random access. It:
492    /// 1. Identifies which blocks overlap the requested range
493    /// 2. Fetches blocks from cache or decompresses from storage
494    /// 3. Handles thin archive fallback to parent
495    /// 4. Assembles the final buffer from block slices
496    ///
497    /// # Parameters
498    ///
499    /// - `stream`: Which stream to read from (Main or Auxiliary)
500    /// - `offset`: Logical byte offset in the stream
501    /// - `len`: Number of bytes to read
502    ///
503    /// # Returns
504    ///
505    /// A `Vec<u8>` containing the requested data. If the request extends beyond
506    /// the end of the stream, it is truncated. If it starts beyond the end,
507    /// an empty vector is returned.
508    ///
509    /// # Example
510    ///
511    /// ```ignore
512    /// # use std::sync::Arc;
513    /// # use hexz_core::{Archive, ArchiveStream};
514    /// # fn example(archive: Arc<Archive>) -> hexz_common::Result<()> {
515    /// // Read first 512 bytes of main stream
516    /// let data = archive.read_at(ArchiveStream::Main, 0, 512)?;
517    /// # Ok(())
518    /// # }
519    /// ```
520    pub fn read_at(
521        self: &Arc<Self>,
522        stream: ArchiveStream,
523        offset: u64,
524        len: usize,
525    ) -> Result<Vec<u8>> {
526        let stream_size = self.size(stream);
527        if offset >= stream_size {
528            return Ok(Vec::new());
529        }
530        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
531        let mut buffer = vec![0u8; actual_len];
532        self.read_at_into(stream, offset, &mut buffer)?;
533        Ok(buffer)
534    }
535
536    /// Reads into a provided buffer. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
537    pub fn read_at_into(
538        self: &Arc<Self>,
539        stream: ArchiveStream,
540        offset: u64,
541        buffer: &mut [u8],
542    ) -> Result<()> {
543        if buffer.is_empty() {
544            return Ok(());
545        }
546        // SAFETY: &mut [u8] and &mut [MaybeUninit<u8>] have identical layout (both
547        // are slices of single-byte types). Initialized u8 values are valid MaybeUninit<u8>.
548        // The borrow is derived from `buffer` so no aliasing occurs.
549        let uninit =
550            unsafe { &mut *(std::ptr::from_mut::<[u8]>(buffer) as *mut [MaybeUninit<u8>]) };
551        self.read_at_into_uninit(stream, offset, uninit)
552    }
553
554    /// Minimum number of local blocks to use the parallel decompression path.
555    /// Below this, serial decompression is usually faster due to thread sync overhead.
556    const PARALLEL_MIN_BLOCKS: usize = 4;
557
558    /// Collects work items for blocks that need decompression.
559    /// Handles zero blocks and parent-delegated blocks by writing to target immediately.
560    fn collect_work_items(
561        &self,
562        _stream: ArchiveStream,
563        pages: &[PageEntry],
564        page_idx: usize,
565        target: &mut [MaybeUninit<u8>],
566        offset: u64,
567        actual_len: usize,
568    ) -> Result<(Vec<WorkItem>, usize)> {
569        let mut work_items = Vec::new();
570        let mut current_pos = offset;
571        let mut remaining = actual_len;
572        let mut buf_offset = 0usize;
573
574        for page_entry in pages.iter().skip(page_idx) {
575            if remaining == 0 {
576                break;
577            }
578            // Stop if the current page starts after the end of our read range
579            if page_entry.start_logical > current_pos + remaining as u64 {
580                break;
581            }
582
583            let page = self.get_page(page_entry)?;
584            let mut block_logical_start = page_entry.start_logical;
585
586            for (i, block) in page.blocks.iter().enumerate() {
587                if remaining == 0 {
588                    break;
589                }
590                let block_end = block_logical_start + block.logical_len as u64;
591
592                // Check if this block overlaps with our read range
593                if block_end > current_pos {
594                    let offset_in_block = (current_pos - block_logical_start) as usize;
595                    let to_copy = std::cmp::min(
596                        remaining,
597                        (block.logical_len as usize).saturating_sub(offset_in_block),
598                    );
599
600                    // CASE 1: Zero block (sparse)
601                    if block.offset == 0 && block.length == 0 {
602                        Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
603                    }
604                    // CASE 2: Parent block (delegation)
605                    else if block.offset == BLOCK_OFFSET_PARENT {
606                        let mut found = false;
607                        for parent in &self.parents {
608                            if let Some((p_stream, p_idx, p_info)) =
609                                parent.get_block_by_hash(&block.hash)?
610                            {
611                                let data = parent.read_block(p_stream, p_idx, &p_info)?;
612
613                                // Copy the requested range from the parent block
614                                let src = &data[offset_in_block..offset_in_block + to_copy];
615                                // SAFETY: distinct ranges
616                                unsafe {
617                                    let dst_ptr = target.as_mut_ptr().add(buf_offset).cast::<u8>();
618                                    ptr::copy_nonoverlapping(src.as_ptr(), dst_ptr, to_copy);
619                                }
620
621                                found = true;
622                                break;
623                            }
624                        }
625                        if !found {
626                            Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
627                        }
628                    }
629                    // CASE 3: Data block (local)
630                    else {
631                        let global_idx = page_entry.start_block + i as u64;
632                        work_items.push((global_idx, *block, buf_offset, offset_in_block, to_copy));
633                    }
634
635                    current_pos += to_copy as u64;
636                    remaining -= to_copy;
637                    buf_offset += to_copy;
638                }
639                block_logical_start += block.logical_len as u64;
640            }
641        }
642
643        Ok((work_items, buf_offset))
644    }
645
646    /// Executes parallel decompression for multiple blocks.
647    /// Uses rayon to decompress blocks concurrently.
648    fn execute_parallel_decompression(
649        self: &Arc<Self>,
650        stream: ArchiveStream,
651        work_items: &[WorkItem],
652        target: &mut [MaybeUninit<u8>],
653    ) -> Result<()> {
654        let target_ptr = target.as_mut_ptr() as usize;
655        let results: Vec<Result<()>> = work_items
656            .par_iter()
657            .map(
658                |&(block_idx, ref info, buf_offset, offset_in_block, to_copy)| {
659                    // Fetch and decompress
660                    let fetch_result = self.fetch_raw_block(stream, block_idx, info)?;
661                    let data = match fetch_result {
662                        FetchResult::Decompressed(d) => d,
663                        FetchResult::Compressed(raw) => {
664                            self.decompress_and_verify(&raw, block_idx, info)?
665                        }
666                    };
667
668                    // Copy to target
669                    let src = &data[offset_in_block..offset_in_block + to_copy];
670                    // SAFETY: We are writing to a distinct, non-overlapping range of the target buffer
671                    // for each work item. buf_offset and to_copy ensure no bounds are exceeded.
672                    unsafe {
673                        let dst_ptr = (target_ptr + buf_offset) as *mut u8;
674                        ptr::copy_nonoverlapping(src.as_ptr(), dst_ptr, to_copy);
675                    }
676                    Ok(())
677                },
678            )
679            .collect();
680
681        // Propagate the first error encountered, if any
682        for r in results {
683            r?;
684        }
685        Ok(())
686    }
687
688    /// Executes serial decompression for a small number of blocks.
689    fn execute_serial_decompression(
690        &self,
691        stream: ArchiveStream,
692        work_items: &[WorkItem],
693        target: &mut [MaybeUninit<u8>],
694    ) -> Result<()> {
695        for &(block_idx, ref info, buf_offset, offset_in_block, to_copy) in work_items {
696            let fetch_result = self.fetch_raw_block(stream, block_idx, info)?;
697            let data = match fetch_result {
698                FetchResult::Decompressed(d) => d,
699                FetchResult::Compressed(raw) => {
700                    self.decompress_and_verify(&raw, block_idx, info)?
701                }
702            };
703
704            let src = &data[offset_in_block..offset_in_block + to_copy];
705            // SAFETY: Serial execution, distinct ranges.
706            unsafe {
707                let dst_ptr = target.as_mut_ptr().add(buf_offset).cast::<u8>();
708                ptr::copy_nonoverlapping(src.as_ptr(), dst_ptr, to_copy);
709            }
710        }
711        Ok(())
712    }
713
714    /// Fills uninitialized memory with zeros.
715    fn zero_fill_uninit(buffer: &mut [MaybeUninit<u8>]) {
716        let mut remaining = buffer.len();
717        let mut offset = 0;
718        while remaining > 0 {
719            let to_copy = std::cmp::min(remaining, ZEROS_64K.len());
720            // SAFETY: `ZEROS_64K` is a static initialized array; `buffer` is a valid mutable
721            // slice of `MaybeUninit<u8>`. `offset + to_copy <= buffer.len()` is maintained by
722            // the loop, and `to_copy <= ZEROS_64K.len()`. Writing initialized bytes into
723            // `MaybeUninit<u8>` is always valid since `u8` has no invalid bit patterns.
724            unsafe {
725                ptr::copy_nonoverlapping(
726                    ZEROS_64K.as_ptr(),
727                    buffer.as_mut_ptr().add(offset).cast::<u8>(),
728                    to_copy,
729                );
730            }
731            remaining -= to_copy;
732            offset += to_copy;
733        }
734    }
735
736    /// Writes into uninitialized memory. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
737    ///
738    /// **On error:** The buffer contents are undefined (possibly partially written).
739    pub fn read_at_into_uninit(
740        self: &Arc<Self>,
741        stream: ArchiveStream,
742        offset: u64,
743        buffer: &mut [MaybeUninit<u8>],
744    ) -> Result<()> {
745        self.read_at_uninit_inner(stream, offset, buffer, false)
746    }
747
748    /// Inner implementation of [`read_at_into_uninit`](Self::read_at_into_uninit).
749    /// The `is_prefetch` flag prevents recursive prefetch thread spawning:
750    /// when `true`, the prefetch block is skipped to avoid unbounded thread creation.
751    fn read_at_uninit_inner(
752        self: &Arc<Self>,
753        stream: ArchiveStream,
754        offset: u64,
755        buffer: &mut [MaybeUninit<u8>],
756        is_prefetch: bool,
757    ) -> Result<()> {
758        // Early validation
759        let len = buffer.len();
760        if len == 0 {
761            return Ok(());
762        }
763
764        let stream_size = self.size(stream);
765        if offset >= stream_size {
766            Self::zero_fill_uninit(buffer);
767            return Ok(());
768        }
769
770        // Calculate actual read length and zero-fill suffix if needed
771        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
772        if actual_len < len {
773            Self::zero_fill_uninit(&mut buffer[actual_len..]);
774        }
775
776        let target = &mut buffer[0..actual_len];
777
778        // Get page list for stream
779        let pages = match stream {
780            ArchiveStream::Main => &self.master.main_pages,
781            ArchiveStream::Auxiliary => &self.master.auxiliary_pages,
782        };
783
784        // Delegate to parent if no index pages
785        if pages.is_empty() {
786            for parent in &self.parents {
787                if parent.get_block_info(stream, offset)?.is_some() {
788                    return parent.read_at_into_uninit(stream, offset, target);
789                }
790            }
791            Self::zero_fill_uninit(target);
792            return Ok(());
793        }
794
795        // Find starting page index
796        let page_idx: usize = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
797            Ok(idx) => idx,
798            Err(idx) => idx.saturating_sub(1),
799        };
800
801        // Collect work items (handles parent blocks, zero blocks, and queues regular blocks)
802        let (work_items, buf_offset) =
803            self.collect_work_items(stream, pages, page_idx, target, offset, actual_len)?;
804
805        // Choose parallel or serial decompression based on work item count
806        let work_items_slice: &[WorkItem] = &work_items;
807        if work_items_slice.len() >= Self::PARALLEL_MIN_BLOCKS {
808            self.execute_parallel_decompression(stream, work_items_slice, target)?;
809        } else {
810            self.execute_serial_decompression(stream, work_items_slice, target)?;
811        }
812
813        // Handle any remaining unprocessed data
814        let remaining = actual_len - buf_offset;
815        if remaining > 0 {
816            let current_pos = offset + buf_offset as u64;
817            let mut found = false;
818            for parent in &self.parents {
819                if parent.get_block_info(stream, current_pos)?.is_some() {
820                    parent.read_at_into_uninit(stream, current_pos, &mut target[buf_offset..])?;
821                    found = true;
822                    break;
823                }
824            }
825            if !found {
826                Self::zero_fill_uninit(&mut target[buf_offset..]);
827            }
828        }
829
830        // Trigger prefetch for next sequential blocks if enabled.
831        // Guards:
832        // 1. `is_prefetch` prevents recursive spawning (prefetch thread spawning another)
833        // 2. `try_start()` limits to one in-flight prefetch at a time, preventing
834        //    unbounded thread creation under rapid sequential reads
835        if let Some(prefetcher) = &self.prefetcher {
836            if !is_prefetch && !work_items.is_empty() && prefetcher.try_start() {
837                let next_offset = offset + actual_len as u64;
838                let prefetch_len = (self.header.block_size * 4) as usize;
839                let snap = Arc::clone(self);
840                let stream_copy = stream;
841                rayon::spawn(move || {
842                    let _ = snap.warm_blocks(stream_copy, next_offset, prefetch_len);
843                    // Release the in-flight guard so the next read can prefetch
844                    if let Some(pf) = &snap.prefetcher {
845                        pf.clear_in_flight();
846                    }
847                });
848            }
849        }
850
851        Ok(())
852    }
853
854    /// Warms the block cache for the given byte range without allocating a target buffer.
855    ///
856    /// Unlike [`read_at_into_uninit`](Self::read_at_into_uninit), this method only fetches,
857    /// decompresses, and inserts blocks into the L1 cache. It skips blocks that are already
858    /// cached, zero-length, or parent-delegated. No output buffer is allocated or written to.
859    ///
860    /// Used by the prefetcher to reduce overhead: the old path allocated a throwaway buffer
861    /// of `block_size * 4` bytes and copied decompressed data into it, only to discard it.
862    fn warm_blocks(&self, stream: ArchiveStream, offset: u64, len: usize) -> Result<()> {
863        if len == 0 {
864            return Ok(());
865        }
866        let stream_size = self.size(stream);
867        if offset >= stream_size {
868            return Ok(());
869        }
870        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
871
872        let pages = match stream {
873            ArchiveStream::Main => &self.master.main_pages,
874            ArchiveStream::Auxiliary => &self.master.auxiliary_pages,
875        };
876        if pages.is_empty() {
877            return Ok(());
878        }
879
880        let page_idx: usize = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
881            Ok(idx) => idx,
882            Err(idx) => idx.saturating_sub(1),
883        };
884
885        let mut current_pos = offset;
886        let mut remaining = actual_len;
887
888        for page_entry in pages.iter().skip(page_idx) {
889            if remaining == 0 {
890                break;
891            }
892            if page_entry.start_logical > current_pos + remaining as u64 {
893                break;
894            }
895
896            let page: Arc<IndexPage> = self.get_page(page_entry)?;
897            let mut block_logical_start = page_entry.start_logical;
898
899            for (i, block) in page.blocks.iter().enumerate() {
900                if remaining == 0 {
901                    break;
902                }
903                let block_end = block_logical_start + block.logical_len as u64;
904
905                if block_end > current_pos {
906                    let offset_in_block = (current_pos - block_logical_start) as usize;
907                    let to_advance = std::cmp::min(
908                        remaining,
909                        (block.logical_len as usize).saturating_sub(offset_in_block),
910                    );
911
912                    // Only warm regular blocks (skip parent-delegated and zero blocks).
913                    // fetch_raw_block handles the cache check internally — on a hit it
914                    // returns Decompressed which we simply ignore via the Compressed match.
915                    if block.offset != BLOCK_OFFSET_PARENT && block.length > 0 {
916                        let global_idx = page_entry.start_block + i as u64;
917                        if let Ok(FetchResult::Compressed(raw)) =
918                            self.fetch_raw_block(stream, global_idx, block)
919                        {
920                            if let Ok(data) = self.decompress_and_verify(&raw, global_idx, block) {
921                                self.cache_l1.insert(stream, global_idx, data);
922                            }
923                        }
924                    }
925
926                    current_pos += to_advance as u64;
927                    remaining -= to_advance;
928                }
929                block_logical_start += block.logical_len as u64;
930            }
931        }
932
933        Ok(())
934    }
935
936    /// Like [`read_at_into_uninit`](Self::read_at_into_uninit) but accepts `&mut [u8]`. Use from FFI (e.g. Python).
937    #[inline]
938    pub fn read_at_into_uninit_bytes(
939        self: &Arc<Self>,
940        stream: ArchiveStream,
941        offset: u64,
942        buf: &mut [u8],
943    ) -> Result<()> {
944        if buf.is_empty() {
945            return Ok(());
946        }
947        // SAFETY: &mut [u8] and &mut [MaybeUninit<u8>] have identical layout (both
948        // are slices of single-byte types). Initialized u8 values are valid MaybeUninit<u8>.
949        // The borrow is derived from `buf` so no aliasing occurs.
950        let uninit = unsafe { &mut *(std::ptr::from_mut::<[u8]>(buf) as *mut [MaybeUninit<u8>]) };
951        self.read_at_into_uninit(stream, offset, uninit)
952    }
953
954    /// Fetches an index page from cache or storage.
955    ///
956    /// Index pages map logical offsets to physical block locations. This method
957    /// maintains an LRU cache to avoid repeated deserialization.
958    ///
959    /// # Parameters
960    ///
961    /// - `entry`: Page metadata from master index
962    ///
963    /// # Returns
964    ///
965    /// A shared reference to the deserialized index page.
966    ///
967    /// # Thread Safety
968    ///
969    /// This method acquires a lock on the page cache only for cache lookup and insertion.
970    /// I/O and deserialization are performed without holding the lock to avoid blocking
971    /// other threads during cache misses.
972    pub(crate) fn get_page(&self, entry: &PageEntry) -> Result<Arc<IndexPage>> {
973        // Fast path: check sharded cache
974        if let Some(p) = self.page_cache.get(entry.offset) {
975            return Ok(p);
976        }
977
978        // Slow path: I/O and deserialization without holding any lock
979        let bytes = self
980            .backend
981            .read_exact(entry.offset, entry.length as usize)?;
982        let page: IndexPage = bincode::deserialize(&bytes)?;
983        let arc = Arc::new(page);
984
985        // Check again in case another thread inserted while we were doing I/O
986        if let Some(p) = self.page_cache.get(entry.offset) {
987            return Ok(p);
988        }
989        self.page_cache.insert(entry.offset, arc.clone());
990
991        Ok(arc)
992    }
993
994    /// Fetches raw compressed block data from cache or storage.
995    ///
996    /// This is the I/O portion of block resolution, separated to enable parallel I/O.
997    /// It:
998    /// 1. Checks the block cache
999    /// 2. Handles zero-length blocks
1000    /// 3. Reads raw compressed data from backend
1001    ///
1002    /// # Parameters
1003    ///
1004    /// - `stream`: Stream identifier (for cache key)
1005    /// - `block_idx`: Global block index
1006    /// - `info`: Block metadata (offset, length)
1007    ///
1008    /// # Returns
1009    ///
1010    /// Raw block data (potentially compressed/encrypted) or cached decompressed data.
1011    fn fetch_raw_block(
1012        &self,
1013        stream: ArchiveStream,
1014        block_idx: u64,
1015        info: &BlockInfo,
1016    ) -> Result<FetchResult> {
1017        // Check cache first - return decompressed data if available
1018        if let Some(data) = self.cache_l1.get(stream, block_idx) {
1019            return Ok(FetchResult::Decompressed(data));
1020        }
1021
1022        // Handle zero blocks
1023        if info.offset == 0 && info.length == 0 {
1024            // Check if we can use the shared 64K zero block
1025            if info.logical_len == DEFAULT_BLOCK_SIZE {
1026                return Ok(FetchResult::Decompressed(Bytes::from_static(&ZEROS_64K)));
1027            }
1028            return Ok(FetchResult::Decompressed(Bytes::from(vec![
1029                0u8;
1030                info.logical_len
1031                    as usize
1032            ])));
1033        }
1034
1035        // Read raw data from backend
1036        let raw = self.backend.read_exact(info.offset, info.length as usize)?;
1037        Ok(FetchResult::Compressed(raw))
1038    }
1039
1040    /// Decompresses and optionally decrypts a block.
1041    /// Validates the block checksum after decompression/decryption.
1042    fn decompress_and_verify(&self, raw: &[u8], block_idx: u64, info: &BlockInfo) -> Result<Bytes> {
1043        // Verify checksum of final data (compressed + encrypted)
1044        let actual_checksum = crc32_hash(raw);
1045        if actual_checksum != info.checksum {
1046            return Err(Error::Format(format!(
1047                "Block {} checksum mismatch: expected {:08x}, got {:08x}",
1048                block_idx, info.checksum, actual_checksum
1049            )));
1050        }
1051
1052        let mut out = vec![0u8; info.logical_len as usize];
1053
1054        if let Some(ref enc) = self.encryptor {
1055            let compressed = enc.decrypt(raw, block_idx)?;
1056            _ = self.compressor.decompress_into(&compressed, &mut out)?;
1057        } else {
1058            _ = self.compressor.decompress_into(raw, &mut out)?;
1059        }
1060
1061        Ok(Bytes::from(out))
1062    }
1063}