Skip to main content

hexz_core/api/
file.rs

1//! High-level snapshot file API and logical stream types.
2
3use crate::algo::compression::{Compressor, create_compressor};
4use crate::algo::encryption::Encryptor;
5use crate::cache::lru::{BlockCache, ShardedPageCache};
6use crate::cache::prefetch::Prefetcher;
7use crate::format::header::Header;
8use crate::format::index::{BlockInfo, IndexPage, MasterIndex, PageEntry};
9use crate::format::magic::{HEADER_SIZE, MAGIC_BYTES};
10use crate::format::version::{VersionCompatibility, check_version, compatibility_message};
11use crate::store::StorageBackend;
12use crate::store::local::file::FileBackend;
13use bytes::Bytes;
14use crc32fast::hash as crc32_hash;
15use std::mem::MaybeUninit;
16use std::path::Path;
17use std::ptr;
18use std::sync::{Arc, Mutex};
19
20use hexz_common::constants::{BLOCK_OFFSET_PARENT, DEFAULT_BLOCK_SIZE};
21use hexz_common::{Error, Result};
22use rayon::prelude::*;
23
24/// Shared zero block for the default block size to avoid allocating when returning zero blocks.
25static ZEROS_64K: [u8; DEFAULT_BLOCK_SIZE as usize] = [0u8; DEFAULT_BLOCK_SIZE as usize];
26
27/// Work item for block decompression: (block_idx, info, buf_offset, offset_in_block, to_copy)
28type WorkItem = (u64, BlockInfo, usize, usize, usize);
29
30/// Result of fetching a block from cache or storage.
31///
32/// Eliminates TOCTOU races by tracking data state at fetch time rather than
33/// re-checking the cache later (which can give a different answer if a
34/// background prefetch thread modifies the cache between check and use).
35enum FetchResult {
36    /// Data is already decompressed (came from L1 cache or is a zero block).
37    Decompressed(Bytes),
38    /// Data is raw compressed bytes from storage (needs decompression).
39    Compressed(Bytes),
40}
41
42/// Logical stream identifier for dual-stream snapshots.
43///
44/// Hexz snapshots can store two independent data streams:
45/// - **Disk**: Persistent storage (disk image, filesystem data)
46/// - **Memory**: Volatile state (RAM contents, process memory)
47///
48/// # Example
49///
50/// ```no_run
51/// use hexz_core::{File, SnapshotStream};
52/// # use std::sync::Arc;
53/// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
54/// // Read 4KB from disk stream
55/// let disk_data = snapshot.read_at(SnapshotStream::Disk, 0, 4096)?;
56///
57/// // Read 4KB from memory stream (if present)
58/// let mem_data = snapshot.read_at(SnapshotStream::Memory, 0, 4096)?;
59/// # Ok(())
60/// # }
61/// ```
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63#[repr(u8)]
64pub enum SnapshotStream {
65    /// Persistent disk/storage stream
66    Disk = 0,
67    /// Volatile memory stream
68    Memory = 1,
69}
70
71/// Read-only interface for accessing Hexz snapshot data.
72///
73/// `File` is the primary API for reading compressed, block-indexed snapshots.
74/// It handles:
75/// - Block-level decompression with LRU caching
76/// - Optional AES-256-GCM decryption
77/// - Thin snapshot parent chaining
78/// - Dual-stream access (disk and memory)
79/// - Random access with minimal I/O
80///
81/// # Thread Safety
82///
83/// `File` is `Send + Sync` and can be safely shared across threads via `Arc`.
84/// Internal caches use `Mutex` for synchronization.
85///
86/// # Performance
87///
88/// - **Cache hit latency**: ~80μs (warm cache)
89/// - **Cache miss latency**: ~1ms (cold cache, local storage)
90/// - **Sequential throughput**: ~2-3 GB/s (NVMe + LZ4)
91/// - **Memory overhead**: ~150MB typical (configurable)
92///
93/// # Examples
94///
95/// ## Basic Usage
96///
97/// ```no_run
98/// use hexz_core::{File, SnapshotStream};
99/// use hexz_core::store::local::FileBackend;
100/// use hexz_core::algo::compression::lz4::Lz4Compressor;
101/// use std::sync::Arc;
102///
103/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
104/// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
105/// let compressor = Box::new(Lz4Compressor::new());
106/// let snapshot = File::new(backend, compressor, None)?;
107///
108/// // Read 4KB at offset 1MB
109/// let data = snapshot.read_at(SnapshotStream::Disk, 1024 * 1024, 4096)?;
110/// assert_eq!(data.len(), 4096);
111/// # Ok(())
112/// # }
113/// ```
114///
115/// ## Thin Snapshots (with parent)
116///
117/// ```no_run
118/// use hexz_core::File;
119/// use hexz_core::store::local::FileBackend;
120/// use hexz_core::algo::compression::lz4::Lz4Compressor;
121/// use std::sync::Arc;
122///
123/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
124/// // Open base snapshot
125/// let base_backend = Arc::new(FileBackend::new("base.hxz".as_ref())?);
126/// let base = File::new(
127///     base_backend,
128///     Box::new(Lz4Compressor::new()),
129///     None
130/// )?;
131///
132/// // The thin snapshot will automatically load its parent based on
133/// // the parent_path field in the header
134/// let thin_backend = Arc::new(FileBackend::new("incremental.hxz".as_ref())?);
135/// let thin = File::new(
136///     thin_backend,
137///     Box::new(Lz4Compressor::new()),
138///     None
139/// )?;
140///
141/// // Reads automatically fall back to base for unchanged blocks
142/// let data = thin.read_at(hexz_core::SnapshotStream::Disk, 0, 4096)?;
143/// # Ok(())
144/// # }
145/// ```
146pub struct File {
147    /// Snapshot metadata (sizes, compression, encryption settings)
148    pub header: Header,
149
150    /// Master index containing top-level page entries
151    master: MasterIndex,
152
153    /// Storage backend for reading raw snapshot data
154    backend: Arc<dyn StorageBackend>,
155
156    /// Compression algorithm (LZ4 or Zstandard)
157    compressor: Box<dyn Compressor>,
158
159    /// Optional encryption (AES-256-GCM)
160    encryptor: Option<Box<dyn Encryptor>>,
161
162    /// Optional parent snapshot for thin (incremental) snapshots.
163    /// When a block's offset is BLOCK_OFFSET_PARENT, data is fetched from parent.
164    parent: Option<Arc<File>>,
165
166    /// LRU cache for decompressed blocks (per-stream, per-block-index)
167    cache_l1: BlockCache,
168
169    /// Sharded LRU cache for deserialized index pages
170    page_cache: ShardedPageCache,
171
172    /// Optional prefetcher for background data loading
173    prefetcher: Option<Prefetcher>,
174}
175
176impl File {
177    /// Opens a Hexz snapshot with default cache settings.
178    ///
179    /// This is the primary constructor for `File`. It:
180    /// 1. Reads and validates the snapshot header (magic bytes, version)
181    /// 2. Deserializes the master index
182    /// 3. Recursively loads parent snapshots (for thin snapshots)
183    /// 4. Initializes block and page caches
184    ///
185    /// # Parameters
186    ///
187    /// - `backend`: Storage backend (local file, HTTP, S3, etc.)
188    /// - `compressor`: Compression algorithm matching the snapshot format
189    /// - `encryptor`: Optional decryption handler (pass `None` for unencrypted snapshots)
190    ///
191    /// # Returns
192    ///
193    /// - `Ok(File)` on success
194    /// - `Err(Error::Format)` if magic bytes or version are invalid
195    /// - `Err(Error::Io)` if storage backend fails
196    ///
197    /// # Examples
198    ///
199    /// ```no_run
200    /// use hexz_core::{File, SnapshotStream};
201    /// use hexz_core::store::local::FileBackend;
202    /// use hexz_core::algo::compression::lz4::Lz4Compressor;
203    /// use std::sync::Arc;
204    ///
205    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
206    /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
207    /// let compressor = Box::new(Lz4Compressor::new());
208    /// let snapshot = File::new(backend, compressor, None)?;
209    ///
210    /// println!("Disk size: {} bytes", snapshot.size(SnapshotStream::Disk));
211    /// # Ok(())
212    /// # }
213    /// ```
214    /// Opens a snapshot, auto-detecting compression and dictionary from the header.
215    ///
216    /// This eliminates the 3-step boilerplate of: read header, load dict, create
217    /// compressor. Equivalent to `File::new(backend, auto_compressor, encryptor)`.
218    pub fn open(
219        backend: Arc<dyn StorageBackend>,
220        encryptor: Option<Box<dyn Encryptor>>,
221    ) -> Result<Arc<Self>> {
222        Self::open_with_cache(backend, encryptor, None, None)
223    }
224
225    /// Like [`open`](Self::open) but with custom cache and prefetch settings.
226    pub fn open_with_cache(
227        backend: Arc<dyn StorageBackend>,
228        encryptor: Option<Box<dyn Encryptor>>,
229        cache_capacity_bytes: Option<usize>,
230        prefetch_window_size: Option<u32>,
231    ) -> Result<Arc<Self>> {
232        let header = Header::read_from_backend(backend.as_ref())?;
233        let dictionary = header.load_dictionary(backend.as_ref())?;
234        let compressor = create_compressor(header.compression, None, dictionary);
235        Self::with_cache(
236            backend,
237            compressor,
238            encryptor,
239            cache_capacity_bytes,
240            prefetch_window_size,
241        )
242    }
243
244    pub fn new(
245        backend: Arc<dyn StorageBackend>,
246        compressor: Box<dyn Compressor>,
247        encryptor: Option<Box<dyn Encryptor>>,
248    ) -> Result<Arc<Self>> {
249        Self::with_cache(backend, compressor, encryptor, None, None)
250    }
251
252    /// Opens a Hexz snapshot with custom cache capacity and prefetching.
253    ///
254    /// Identical to [`new`](Self::new) but allows specifying cache size and prefetch window.
255    ///
256    /// # Parameters
257    ///
258    /// - `backend`: Storage backend
259    /// - `compressor`: Compression algorithm
260    /// - `encryptor`: Optional decryption handler
261    /// - `cache_capacity_bytes`: Block cache size in bytes (default: ~400MB for 4KB blocks)
262    /// - `prefetch_window_size`: Number of blocks to prefetch ahead (default: disabled)
263    ///
264    /// # Cache Sizing
265    ///
266    /// The cache stores decompressed blocks. Given a block size of 4KB:
267    /// - `Some(100_000_000)` → ~24,000 blocks (~96MB effective)
268    /// - `None` → 1000 blocks (~4MB effective)
269    ///
270    /// Larger caches reduce repeated decompression but increase memory usage.
271    ///
272    /// # Prefetching
273    ///
274    /// When `prefetch_window_size` is set, the system will automatically fetch the next N blocks
275    /// in the background after each read, optimizing sequential access patterns:
276    /// - `Some(4)` → Prefetch 4 blocks ahead
277    /// - `None` or `Some(0)` → Disable prefetching
278    ///
279    /// # Examples
280    ///
281    /// ```no_run
282    /// use hexz_core::File;
283    /// use hexz_core::store::local::FileBackend;
284    /// use hexz_core::algo::compression::lz4::Lz4Compressor;
285    /// use std::sync::Arc;
286    ///
287    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
288    /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
289    /// let compressor = Box::new(Lz4Compressor::new());
290    ///
291    /// // Allocate 256MB for cache, prefetch 4 blocks ahead
292    /// let snapshot = File::with_cache(
293    ///     backend,
294    ///     compressor,
295    ///     None,
296    ///     Some(256 * 1024 * 1024),
297    ///     Some(4)
298    /// )?;
299    /// # Ok(())
300    /// # }
301    /// ```
302    pub fn with_cache(
303        backend: Arc<dyn StorageBackend>,
304        compressor: Box<dyn Compressor>,
305        encryptor: Option<Box<dyn Encryptor>>,
306        cache_capacity_bytes: Option<usize>,
307        prefetch_window_size: Option<u32>,
308    ) -> Result<Arc<Self>> {
309        let header_bytes = backend.read_exact(0, HEADER_SIZE)?;
310        let header: Header = bincode::deserialize(&header_bytes)?;
311
312        if &header.magic != MAGIC_BYTES {
313            return Err(Error::Format("Invalid magic bytes".into()));
314        }
315
316        // Check version compatibility
317        let compatibility = check_version(header.version);
318        match compatibility {
319            VersionCompatibility::Full => {
320                // Perfect match, proceed silently
321            }
322            VersionCompatibility::Degraded => {
323                // Newer version, issue warning but allow
324                tracing::warn!("{}", compatibility_message(header.version));
325            }
326            VersionCompatibility::Incompatible => {
327                // Too old or too new, reject
328                return Err(Error::Format(compatibility_message(header.version)));
329            }
330        }
331
332        let file_len = backend.len();
333        if header.index_offset >= file_len {
334            return Err(Error::Format(format!(
335                "index_offset ({}) is at or past end of file ({})",
336                header.index_offset, file_len
337            )));
338        }
339        let index_bytes = backend.read_exact(
340            header.index_offset,
341            (file_len - header.index_offset) as usize,
342        )?;
343
344        let master: MasterIndex = bincode::deserialize(&index_bytes)?;
345
346        // Recursively load parent if present
347        let parent = if let Some(parent_path) = &header.parent_path {
348            tracing::info!("Loading parent snapshot: {}", parent_path);
349            let p_backend = Arc::new(FileBackend::new(Path::new(parent_path))?);
350            Some(File::open(p_backend, None)?)
351        } else {
352            None
353        };
354
355        let block_size = header.block_size as usize;
356        let l1_capacity = if let Some(bytes) = cache_capacity_bytes {
357            (bytes / block_size).max(1)
358        } else {
359            1000
360        };
361
362        // Initialize prefetcher if window size is specified and > 0
363        let prefetcher = prefetch_window_size.filter(|&w| w > 0).map(Prefetcher::new);
364
365        Ok(Arc::new(Self {
366            header,
367            master,
368            backend,
369            compressor,
370            encryptor,
371            parent,
372            cache_l1: BlockCache::with_capacity(l1_capacity),
373            page_cache: ShardedPageCache::default(),
374            prefetcher,
375        }))
376    }
377
378    /// Returns the logical size of a stream in bytes.
379    ///
380    /// # Parameters
381    ///
382    /// - `stream`: The stream to query (Disk or Memory)
383    ///
384    /// # Returns
385    ///
386    /// The uncompressed, logical size of the stream. This is the size you would
387    /// get if you decompressed all blocks and concatenated them.
388    ///
389    /// # Examples
390    ///
391    /// ```no_run
392    /// use hexz_core::{File, SnapshotStream};
393    /// # use std::sync::Arc;
394    /// # fn example(snapshot: Arc<File>) {
395    /// let disk_bytes = snapshot.size(SnapshotStream::Disk);
396    /// let mem_bytes = snapshot.size(SnapshotStream::Memory);
397    ///
398    /// println!("Disk: {} GB", disk_bytes / (1024 * 1024 * 1024));
399    /// println!("Memory: {} MB", mem_bytes / (1024 * 1024));
400    /// # }
401    /// ```
402    /// Returns the total number of prefetch operations spawned since this file was opened.
403    /// Returns 0 if prefetching is disabled.
404    pub fn prefetch_spawn_count(&self) -> u64 {
405        self.prefetcher.as_ref().map_or(0, |p| p.spawn_count())
406    }
407
408    pub fn size(&self, stream: SnapshotStream) -> u64 {
409        match stream {
410            SnapshotStream::Disk => self.master.disk_size,
411            SnapshotStream::Memory => self.master.memory_size,
412        }
413    }
414
415    /// Reads data from a snapshot stream at a given offset.
416    ///
417    /// This is the primary read method for random access. It:
418    /// 1. Identifies which blocks overlap the requested range
419    /// 2. Fetches blocks from cache or decompresses from storage
420    /// 3. Handles thin snapshot fallback to parent
421    /// 4. Assembles the final buffer from block slices
422    ///
423    /// # Parameters
424    ///
425    /// - `stream`: Which stream to read from (Disk or Memory)
426    /// - `offset`: Starting byte offset (0-indexed)
427    /// - `len`: Number of bytes to read
428    ///
429    /// # Returns
430    ///
431    /// A `Vec<u8>` containing up to `len` bytes. The returned vector may be shorter
432    /// if:
433    /// - `offset` is beyond the stream size (returns empty vector)
434    /// - `offset + len` exceeds stream size (returns partial data)
435    ///
436    /// Missing data (sparse regions) is zero-filled.
437    ///
438    /// # Errors
439    ///
440    /// - `Error::Io` if backend read fails (e.g. truncated file)
441    /// - `Error::Corruption(block_idx)` if block checksum does not match
442    /// - `Error::Decompression` if block decompression fails
443    /// - `Error::Decryption` if block decryption fails
444    ///
445    /// # Performance
446    ///
447    /// - **Cache hit**: ~80μs latency, no I/O
448    /// - **Cache miss**: ~1ms latency (local storage), includes decompression
449    /// - **Remote storage**: Latency depends on network (HTTP: ~50ms, S3: ~100ms)
450    ///
451    /// Aligned reads (offset % block_size == 0) are most efficient.
452    ///
453    /// # Examples
454    ///
455    /// ```no_run
456    /// use hexz_core::{File, SnapshotStream};
457    /// # use std::sync::Arc;
458    /// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
459    /// // Read first 512 bytes of disk stream
460    /// let boot_sector = snapshot.read_at(SnapshotStream::Disk, 0, 512)?;
461    ///
462    /// // Read from arbitrary offset
463    /// let chunk = snapshot.read_at(SnapshotStream::Disk, 1024 * 1024, 4096)?;
464    ///
465    /// // Reading beyond stream size returns empty vector
466    /// let empty = snapshot.read_at(SnapshotStream::Disk, u64::MAX, 100)?;
467    /// assert!(empty.is_empty());
468    /// # Ok(())
469    /// # }
470    /// ```
471    /// Reads a byte range. Uses parallel block decompression when the range spans multiple blocks.
472    pub fn read_at(
473        self: &Arc<Self>,
474        stream: SnapshotStream,
475        offset: u64,
476        len: usize,
477    ) -> Result<Vec<u8>> {
478        let stream_size = self.size(stream);
479        if offset >= stream_size {
480            return Ok(Vec::new());
481        }
482        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
483        if actual_len == 0 {
484            return Ok(Vec::new());
485        }
486
487        let pages = match stream {
488            SnapshotStream::Disk => &self.master.disk_pages,
489            SnapshotStream::Memory => &self.master.memory_pages,
490        };
491
492        if pages.is_empty() {
493            if let Some(parent) = &self.parent {
494                return parent.read_at(stream, offset, actual_len);
495            }
496            return Ok(vec![0u8; actual_len]);
497        }
498
499        let mut buf: Vec<MaybeUninit<u8>> = Vec::new();
500        buf.resize_with(actual_len, MaybeUninit::uninit);
501        self.read_at_into_uninit(stream, offset, &mut buf)?;
502        let ptr = buf.as_mut_ptr().cast::<u8>();
503        let len = buf.len();
504        let cap = buf.capacity();
505        std::mem::forget(buf);
506        // SAFETY: `buf` was a Vec<MaybeUninit<u8>> that we fully initialized via
507        // `read_at_into_uninit` (which writes every byte). We `forget` the original
508        // Vec to avoid a double-free and reconstruct it with the same ptr/len/cap.
509        // MaybeUninit<u8> has the same layout as u8.
510        Ok(unsafe { Vec::from_raw_parts(ptr, len, cap) })
511    }
512
513    /// Reads into a provided buffer. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
514    pub fn read_at_into(
515        self: &Arc<Self>,
516        stream: SnapshotStream,
517        offset: u64,
518        buffer: &mut [u8],
519    ) -> Result<()> {
520        let len = buffer.len();
521        if len == 0 {
522            return Ok(());
523        }
524        let stream_size = self.size(stream);
525        if offset >= stream_size {
526            buffer.fill(0);
527            return Ok(());
528        }
529        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
530        if actual_len < len {
531            buffer[actual_len..].fill(0);
532        }
533        self.read_at_into_uninit_bytes(stream, offset, &mut buffer[0..actual_len])
534    }
535
536    /// Minimum number of local blocks to use the parallel decompression path.
537    const PARALLEL_MIN_BLOCKS: usize = 2;
538
539    /// Collects work items for blocks that need decompression.
540    ///
541    /// This method iterates through index pages and blocks, handling:
542    /// - Parent blocks: delegate to parent snapshot or zero-fill
543    /// - Zero blocks: zero-fill directly
544    /// - Regular blocks: add to work queue for later decompression
545    ///
546    /// Returns the work items to process and updates the tracking variables.
547    fn collect_work_items(
548        &self,
549        stream: SnapshotStream,
550        pages: &[PageEntry],
551        page_idx: usize,
552        target: &mut [MaybeUninit<u8>],
553        offset: u64,
554        actual_len: usize,
555    ) -> Result<(Vec<WorkItem>, usize)> {
556        let mut local_work: Vec<WorkItem> = Vec::new();
557        let mut buf_offset = 0;
558        let mut current_pos = offset;
559        let mut remaining = actual_len;
560
561        for page_entry in pages.iter().skip(page_idx) {
562            if remaining == 0 {
563                break;
564            }
565            if page_entry.start_logical > current_pos + remaining as u64 {
566                break;
567            }
568
569            let page = self.get_page(page_entry)?;
570            let mut block_logical_start = page_entry.start_logical;
571
572            for (block_idx_in_page, block) in page.blocks.iter().enumerate() {
573                let block_end = block_logical_start + block.logical_len as u64;
574
575                if block_end > current_pos {
576                    let global_block_idx = page_entry.start_block + block_idx_in_page as u64;
577                    let offset_in_block = (current_pos - block_logical_start) as usize;
578                    let to_copy = std::cmp::min(
579                        remaining,
580                        (block.logical_len as usize).saturating_sub(offset_in_block),
581                    );
582
583                    if block.offset == BLOCK_OFFSET_PARENT {
584                        // Parent block: delegate or zero-fill
585                        if let Some(parent) = &self.parent {
586                            let dest = &mut target[buf_offset..buf_offset + to_copy];
587                            parent.read_at_into_uninit(stream, current_pos, dest)?;
588                        } else {
589                            Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
590                        }
591                        current_pos += to_copy as u64;
592                        buf_offset += to_copy;
593                        remaining -= to_copy;
594                    } else if block.length == 0 {
595                        // Zero block: fill with zeros
596                        Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
597                        current_pos += to_copy as u64;
598                        buf_offset += to_copy;
599                        remaining -= to_copy;
600                    } else {
601                        // Regular block: add to work queue
602                        if to_copy > 0 {
603                            local_work.push((
604                                global_block_idx,
605                                *block,
606                                buf_offset,
607                                offset_in_block,
608                                to_copy,
609                            ));
610                            buf_offset += to_copy;
611                            current_pos += to_copy as u64;
612                            remaining -= to_copy;
613                        }
614                    }
615
616                    if remaining == 0 {
617                        break;
618                    }
619                }
620                block_logical_start += block.logical_len as u64;
621            }
622        }
623
624        Ok((local_work, buf_offset))
625    }
626
627    /// Executes parallel decompression for multiple blocks.
628    ///
629    /// Uses a two-phase approach:
630    /// 1. Parallel I/O: Fetch all raw blocks concurrently
631    /// 2. Parallel CPU: Decompress and copy to target buffer
632    fn execute_parallel_decompression(
633        self: &Arc<Self>,
634        stream: SnapshotStream,
635        work_items: &[WorkItem],
636        target: &mut [MaybeUninit<u8>],
637        actual_len: usize,
638    ) -> Result<()> {
639        let snap = Arc::clone(self);
640        let target_addr = target.as_mut_ptr() as usize;
641
642        // Phase 1: Parallel fetch all raw blocks. Each result tracks whether
643        // the data is already decompressed (cache hit / zero block) or still
644        // compressed (storage read), eliminating a TOCTOU race where a background
645        // prefetch thread could modify the cache between fetch and decompression.
646        let raw_blocks: Vec<Result<FetchResult>> = work_items
647            .par_iter()
648            .map(|(block_idx, info, _, _, _)| snap.fetch_raw_block(stream, *block_idx, info))
649            .collect();
650
651        // Phase 2: Parallel decompress and copy
652        let err: Mutex<Option<Error>> = Mutex::new(None);
653        work_items
654            .par_iter()
655            .zip(raw_blocks)
656            .for_each(|(work_item, fetch_result)| {
657                if err.lock().map_or(true, |e| e.is_some()) {
658                    return;
659                }
660
661                let (block_idx, info, buf_offset, offset_in_block, to_copy) = work_item;
662
663                // Handle fetch errors
664                let fetched = match fetch_result {
665                    Ok(r) => r,
666                    Err(e) => {
667                        if let Ok(mut guard) = err.lock() {
668                            let _ = guard.replace(e);
669                        }
670                        return;
671                    }
672                };
673
674                // Use the FetchResult to determine if decompression is needed,
675                // rather than re-checking the cache (which could give a stale answer).
676                let data = match fetched {
677                    FetchResult::Decompressed(data) => data,
678                    FetchResult::Compressed(raw) => {
679                        match snap.decompress_and_verify(raw, *block_idx, info) {
680                            Ok(d) => {
681                                // Cache the result
682                                snap.cache_l1.insert(stream, *block_idx, d.clone());
683                                d
684                            }
685                            Err(e) => {
686                                if let Ok(mut guard) = err.lock() {
687                                    let _ = guard.replace(e);
688                                }
689                                return;
690                            }
691                        }
692                    }
693                };
694
695                // Copy to target buffer
696                let src = data.as_ref();
697                let start = *offset_in_block;
698                let len = *to_copy;
699                if start < src.len() && len <= src.len() - start {
700                    // Defensive assertion: ensure destination write is within bounds
701                    debug_assert!(
702                        buf_offset + len <= actual_len,
703                        "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
704                        len,
705                        buf_offset,
706                        actual_len
707                    );
708                    let dest = (target_addr + buf_offset) as *mut u8;
709                    // SAFETY: `src[start..start+len]` is in-bounds (checked above).
710                    // `dest` points into the `target` MaybeUninit buffer at a unique
711                    // non-overlapping offset (each work item has a distinct `buf_offset`),
712                    // and the rayon par_iter ensures each item writes to a disjoint region.
713                    // The debug_assert above validates buf_offset + len <= actual_len.
714                    unsafe { ptr::copy_nonoverlapping(src[start..].as_ptr(), dest, len) };
715                }
716            });
717
718        if let Some(e) = err.lock().ok().and_then(|mut guard| guard.take()) {
719            return Err(e);
720        }
721
722        Ok(())
723    }
724
725    /// Executes serial decompression for a small number of blocks.
726    fn execute_serial_decompression(
727        &self,
728        stream: SnapshotStream,
729        work_items: &[WorkItem],
730        target: &mut [MaybeUninit<u8>],
731        actual_len: usize,
732    ) -> Result<()> {
733        for (block_idx, info, buf_offset, offset_in_block, to_copy) in work_items {
734            let data = self.resolve_block_data(stream, *block_idx, info)?;
735            let src = data.as_ref();
736            let start = *offset_in_block;
737            if start < src.len() && *to_copy <= src.len() - start {
738                // Defensive assertion: ensure destination write is within bounds
739                debug_assert!(
740                    *buf_offset + *to_copy <= actual_len,
741                    "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
742                    to_copy,
743                    buf_offset,
744                    actual_len
745                );
746                // SAFETY: `src[start..start+to_copy]` is in-bounds (checked above).
747                // `target[buf_offset..]` has sufficient room because `buf_offset + to_copy`
748                // never exceeds `actual_len` (tracked during work-item collection).
749                // The debug_assert above validates this invariant.
750                // MaybeUninit<u8> has the same layout as u8.
751                unsafe {
752                    ptr::copy_nonoverlapping(
753                        src[start..].as_ptr(),
754                        target[*buf_offset..].as_mut_ptr() as *mut u8,
755                        *to_copy,
756                    );
757                }
758            }
759        }
760        Ok(())
761    }
762
763    /// Zero-fills a slice of uninitialized memory.
764    ///
765    /// This helper centralizes all unsafe zero-filling operations to improve
766    /// safety auditing and reduce code duplication.
767    ///
768    /// # Safety
769    ///
770    /// This function writes zeros to the provided buffer, making it fully initialized.
771    /// The caller must ensure the buffer is valid for writes.
772    #[inline]
773    fn zero_fill_uninit(buffer: &mut [MaybeUninit<u8>]) {
774        if !buffer.is_empty() {
775            // SAFETY: buffer is a valid &mut [MaybeUninit<u8>] slice, so writing
776            // buffer.len() zero bytes through its pointer is in-bounds.
777            unsafe { ptr::write_bytes(buffer.as_mut_ptr(), 0, buffer.len()) };
778        }
779    }
780
781    /// Writes into uninitialized memory. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
782    ///
783    /// **On error:** The buffer contents are undefined (possibly partially written).
784    pub fn read_at_into_uninit(
785        self: &Arc<Self>,
786        stream: SnapshotStream,
787        offset: u64,
788        buffer: &mut [MaybeUninit<u8>],
789    ) -> Result<()> {
790        self.read_at_uninit_inner(stream, offset, buffer, false)
791    }
792
793    /// Inner implementation of [`read_at_into_uninit`](Self::read_at_into_uninit).
794    ///
795    /// The `is_prefetch` flag prevents recursive prefetch thread spawning:
796    /// when `true`, the prefetch block is skipped to avoid unbounded thread creation.
797    fn read_at_uninit_inner(
798        self: &Arc<Self>,
799        stream: SnapshotStream,
800        offset: u64,
801        buffer: &mut [MaybeUninit<u8>],
802        is_prefetch: bool,
803    ) -> Result<()> {
804        // Early validation
805        let len = buffer.len();
806        if len == 0 {
807            return Ok(());
808        }
809
810        let stream_size = self.size(stream);
811        if offset >= stream_size {
812            Self::zero_fill_uninit(buffer);
813            return Ok(());
814        }
815
816        // Calculate actual read length and zero-fill suffix if needed
817        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
818        if actual_len < len {
819            Self::zero_fill_uninit(&mut buffer[actual_len..]);
820        }
821
822        let target = &mut buffer[0..actual_len];
823
824        // Get page list for stream
825        let pages = match stream {
826            SnapshotStream::Disk => &self.master.disk_pages,
827            SnapshotStream::Memory => &self.master.memory_pages,
828        };
829
830        // Delegate to parent if no index pages
831        if pages.is_empty() {
832            if let Some(parent) = &self.parent {
833                return parent.read_at_into_uninit(stream, offset, target);
834            }
835            Self::zero_fill_uninit(target);
836            return Ok(());
837        }
838
839        // Find starting page index
840        let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
841            Ok(idx) => idx,
842            Err(idx) => idx.saturating_sub(1),
843        };
844
845        // Collect work items (handles parent blocks, zero blocks, and queues regular blocks)
846        let (work_items, buf_offset) =
847            self.collect_work_items(stream, pages, page_idx, target, offset, actual_len)?;
848
849        // Choose parallel or serial decompression based on work item count
850        if work_items.len() >= Self::PARALLEL_MIN_BLOCKS {
851            self.execute_parallel_decompression(stream, &work_items, target, actual_len)?;
852        } else {
853            self.execute_serial_decompression(stream, &work_items, target, actual_len)?;
854        }
855
856        // Handle any remaining unprocessed data
857        let remaining = actual_len - buf_offset;
858        if remaining > 0 {
859            if let Some(parent) = &self.parent {
860                let current_pos = offset + buf_offset as u64;
861                parent.read_at_into_uninit(stream, current_pos, &mut target[buf_offset..])?;
862            } else {
863                Self::zero_fill_uninit(&mut target[buf_offset..]);
864            }
865        }
866
867        // Trigger prefetch for next sequential blocks if enabled.
868        // Guards:
869        // 1. `is_prefetch` prevents recursive spawning (prefetch thread spawning another)
870        // 2. `try_start()` limits to one in-flight prefetch at a time, preventing
871        //    unbounded thread creation under rapid sequential reads
872        if let Some(prefetcher) = &self.prefetcher {
873            if !is_prefetch && !work_items.is_empty() && prefetcher.try_start() {
874                let next_offset = offset + actual_len as u64;
875                let prefetch_len = (self.header.block_size * 4) as usize;
876                let snap = Arc::clone(self);
877                let stream_copy = stream;
878                std::thread::spawn(move || {
879                    let mut buf = vec![MaybeUninit::uninit(); prefetch_len];
880                    let _ = snap.read_at_uninit_inner(stream_copy, next_offset, &mut buf, true);
881                    // Release the in-flight guard so the next read can prefetch
882                    if let Some(pf) = &snap.prefetcher {
883                        pf.clear_in_flight();
884                    }
885                });
886            }
887        }
888
889        Ok(())
890    }
891
892    /// Like [`read_at_into_uninit`](Self::read_at_into_uninit) but accepts `&mut [u8]`. Use from FFI (e.g. Python).
893    #[inline]
894    pub fn read_at_into_uninit_bytes(
895        self: &Arc<Self>,
896        stream: SnapshotStream,
897        offset: u64,
898        buf: &mut [u8],
899    ) -> Result<()> {
900        if buf.is_empty() {
901            return Ok(());
902        }
903        // SAFETY: &mut [u8] and &mut [MaybeUninit<u8>] have identical layout (both
904        // are slices of single-byte types). Initialized u8 values are valid MaybeUninit<u8>.
905        // The borrow is derived from `buf` so no aliasing occurs.
906        let uninit = unsafe { &mut *(buf as *mut [u8] as *mut [MaybeUninit<u8>]) };
907        self.read_at_into_uninit(stream, offset, uninit)
908    }
909
910    /// Fetches an index page from cache or storage.
911    ///
912    /// Index pages map logical offsets to physical block locations. This method
913    /// maintains an LRU cache to avoid repeated deserialization.
914    ///
915    /// # Parameters
916    ///
917    /// - `entry`: Page metadata from master index
918    ///
919    /// # Returns
920    ///
921    /// A shared reference to the deserialized index page.
922    ///
923    /// # Thread Safety
924    ///
925    /// This method acquires a lock on the page cache only for cache lookup and insertion.
926    /// I/O and deserialization are performed without holding the lock to avoid blocking
927    /// other threads during cache misses.
928    fn get_page(&self, entry: &PageEntry) -> Result<Arc<IndexPage>> {
929        // Fast path: check sharded cache
930        if let Some(p) = self.page_cache.get(entry.offset) {
931            return Ok(p);
932        }
933
934        // Slow path: I/O and deserialization without holding any lock
935        let bytes = self
936            .backend
937            .read_exact(entry.offset, entry.length as usize)?;
938        let page: IndexPage = bincode::deserialize(&bytes)?;
939        let arc = Arc::new(page);
940
941        // Check again in case another thread inserted while we were doing I/O
942        if let Some(p) = self.page_cache.get(entry.offset) {
943            return Ok(p);
944        }
945        self.page_cache.insert(entry.offset, arc.clone());
946
947        Ok(arc)
948    }
949
950    /// Fetches raw compressed block data from cache or storage.
951    ///
952    /// This is the I/O portion of block resolution, separated to enable parallel I/O.
953    /// It:
954    /// 1. Checks the block cache
955    /// 2. Handles zero-length blocks
956    /// 3. Reads raw compressed data from backend
957    ///
958    /// # Parameters
959    ///
960    /// - `stream`: Stream identifier (for cache key)
961    /// - `block_idx`: Global block index
962    /// - `info`: Block metadata (offset, length)
963    ///
964    /// # Returns
965    ///
966    /// Raw block data (potentially compressed/encrypted) or cached decompressed data.
967    fn fetch_raw_block(
968        &self,
969        stream: SnapshotStream,
970        block_idx: u64,
971        info: &BlockInfo,
972    ) -> Result<FetchResult> {
973        // Check cache first - return decompressed data if available
974        if let Some(data) = self.cache_l1.get(stream, block_idx) {
975            return Ok(FetchResult::Decompressed(data));
976        }
977
978        // Handle zero blocks
979        if info.length == 0 {
980            let len = info.logical_len as usize;
981            if len == 0 {
982                return Ok(FetchResult::Decompressed(Bytes::new()));
983            }
984            if len == ZEROS_64K.len() {
985                return Ok(FetchResult::Decompressed(Bytes::from_static(&ZEROS_64K)));
986            }
987            return Ok(FetchResult::Decompressed(Bytes::from(vec![0u8; len])));
988        }
989
990        // Fetch raw compressed data (THIS IS THE PARALLEL PART)
991        self.backend
992            .read_exact(info.offset, info.length as usize)
993            .map(FetchResult::Compressed)
994    }
995
996    /// Decompresses and verifies a raw block.
997    ///
998    /// This is the CPU portion of block resolution, separated to enable parallel decompression.
999    /// It:
1000    /// 1. Verifies CRC32 checksum
1001    /// 2. Decrypts (if encrypted)
1002    /// 3. Decompresses
1003    ///
1004    /// # Parameters
1005    ///
1006    /// - `raw`: Raw block data (potentially compressed/encrypted)
1007    /// - `block_idx`: Global block index (for error reporting and decryption)
1008    /// - `info`: Block metadata (checksum)
1009    ///
1010    /// # Returns
1011    ///
1012    /// Decompressed block data as `Bytes`.
1013    ///
1014    /// # Performance
1015    ///
1016    /// Decompression throughput:
1017    /// - LZ4: ~2 GB/s per core
1018    /// - Zstd: ~500 MB/s per core
1019    fn decompress_and_verify(&self, raw: Bytes, block_idx: u64, info: &BlockInfo) -> Result<Bytes> {
1020        // Verify stored checksum (CRC32 of compressed/encrypted data) before decrypt/decompress
1021        if info.checksum != 0 {
1022            let computed = crc32_hash(&raw);
1023            if computed != info.checksum {
1024                return Err(Error::Corruption(block_idx));
1025            }
1026        }
1027
1028        // Pre-allocate exact output buffer to avoid over-allocation inside decompressor.
1029        // We use decompress_into() instead of decompress() to eliminate the allocation
1030        // and potential reallocation overhead inside the compression library.
1031        //
1032        // Performance impact: Avoids zero-initialization overhead (~16% improvement for
1033        // high-thread-count workloads based on benchmarks).
1034        let out_len = info.logical_len as usize;
1035        let mut out = Vec::with_capacity(out_len);
1036
1037        // SAFETY: This unsafe block is required to create an uninitialized buffer for
1038        // decompress_into() to write into. This is safe because:
1039        //
1040        // 1. Contract guarantee: Both LZ4 and Zstd decompress_into() implementations
1041        //    promise to either:
1042        //    a) Write exactly `out.len()` bytes (the full decompressed size), OR
1043        //    b) Return an Err() if decompression fails (buffer underrun, corruption, etc.)
1044        //
1045        // 2. Size accuracy: We set out.len() to info.logical_len, which is the exact
1046        //    decompressed size recorded in the block metadata during compression.
1047        //    The decompressor will write exactly this many bytes or fail.
1048        //
1049        // 3. Error propagation: If decompress_into() returns Err(), we propagate it
1050        //    immediately via the ? operator. The uninitialized buffer is dropped
1051        //    without ever being read.
1052        //
1053        // 4. No partial writes: The decompressor APIs do not support partial writes.
1054        //    They either fully succeed or fully fail. We never access a partially
1055        //    initialized buffer.
1056        //
1057        // 5. Memory safety: We never read from `out` before decompress_into() succeeds.
1058        //    The only subsequent access is Bytes::from(out), which transfers ownership
1059        //    of the now-fully-initialized buffer.
1060        //
1061        // This is a well-established pattern for zero-copy decompression. The clippy
1062        // lint is conservative and warns about ANY use of set_len() after with_capacity(),
1063        // but in this case we have explicit API guarantees from the decompressor.
1064        #[allow(clippy::uninit_vec)]
1065        unsafe {
1066            out.set_len(out_len);
1067        }
1068
1069        if let Some(enc) = &self.encryptor {
1070            let compressed = enc.decrypt(&raw, block_idx)?;
1071            self.compressor.decompress_into(&compressed, &mut out)?;
1072        } else {
1073            self.compressor.decompress_into(raw.as_ref(), &mut out)?;
1074        }
1075
1076        Ok(Bytes::from(out))
1077    }
1078
1079    /// Resolves raw block data by fetching from cache or decompressing from storage.
1080    ///
1081    /// This is the core decompression path. It:
1082    /// 1. Checks the block cache
1083    /// 2. Reads compressed block from backend
1084    /// 3. Verifies CRC32 checksum (if stored) and returns `Corruption(block_idx)` on mismatch
1085    /// 4. Decrypts (if encrypted)
1086    /// 5. Decompresses
1087    /// 6. Caches the result
1088    ///
1089    /// # Parameters
1090    ///
1091    /// - `stream`: Stream identifier (for cache key)
1092    /// - `block_idx`: Global block index
1093    /// - `info`: Block metadata (offset, length, compression)
1094    ///
1095    /// # Returns
1096    ///
1097    /// Decompressed block data as `Bytes` (zero-copy on cache hit).
1098    ///
1099    /// # Performance
1100    ///
1101    /// This method is hot path for cache misses. Decompression throughput:
1102    /// - LZ4: ~2 GB/s per core
1103    /// - Zstd: ~500 MB/s per core
1104    fn resolve_block_data(
1105        &self,
1106        stream: SnapshotStream,
1107        block_idx: u64,
1108        info: &BlockInfo,
1109    ) -> Result<Bytes> {
1110        // Fetch block (from cache or I/O). The FetchResult tracks whether
1111        // data is already decompressed, avoiding a TOCTOU race where a
1112        // background prefetch thread could modify the cache between fetch
1113        // and the decompression decision.
1114        match self.fetch_raw_block(stream, block_idx, info)? {
1115            FetchResult::Decompressed(data) => Ok(data),
1116            FetchResult::Compressed(raw) => {
1117                let data = self.decompress_and_verify(raw, block_idx, info)?;
1118                self.cache_l1.insert(stream, block_idx, data.clone());
1119                Ok(data)
1120            }
1121        }
1122    }
1123}