Skip to main content

hexz_core/api/
file.rs

1//! High-level snapshot file API and logical stream types.
2
3use crate::algo::compression::{Compressor, create_compressor};
4use crate::algo::encryption::Encryptor;
5use crate::cache::lru::BlockCache;
6use crate::cache::prefetch::Prefetcher;
7use crate::format::header::Header;
8use crate::format::index::{BlockInfo, IndexPage, MasterIndex, PageEntry};
9use crate::format::magic::{HEADER_SIZE, MAGIC_BYTES};
10use crate::format::version::{VersionCompatibility, check_version, compatibility_message};
11use crate::store::StorageBackend;
12use crate::store::local::file::FileBackend;
13use bytes::Bytes;
14use crc32fast::hash as crc32_hash;
15use lru::LruCache;
16use std::mem::MaybeUninit;
17use std::num::NonZeroUsize;
18use std::path::Path;
19use std::ptr;
20use std::sync::{Arc, Mutex};
21
22use hexz_common::constants::{BLOCK_OFFSET_PARENT, DEFAULT_BLOCK_SIZE};
23use hexz_common::{Error, Result};
24use rayon::prelude::*;
25
26/// Shared zero block for the default block size to avoid allocating when returning zero blocks.
27static ZEROS_64K: [u8; DEFAULT_BLOCK_SIZE as usize] = [0u8; DEFAULT_BLOCK_SIZE as usize];
28
29/// Work item for block decompression: (block_idx, info, buf_offset, offset_in_block, to_copy)
30type WorkItem = (u64, BlockInfo, usize, usize, usize);
31
32/// Logical stream identifier for dual-stream snapshots.
33///
34/// Hexz snapshots can store two independent data streams:
35/// - **Disk**: Persistent storage (disk image, filesystem data)
36/// - **Memory**: Volatile state (RAM contents, process memory)
37///
38/// # Example
39///
40/// ```no_run
41/// use hexz_core::{File, SnapshotStream};
42/// # use std::sync::Arc;
43/// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
44/// // Read 4KB from disk stream
45/// let disk_data = snapshot.read_at(SnapshotStream::Disk, 0, 4096)?;
46///
47/// // Read 4KB from memory stream (if present)
48/// let mem_data = snapshot.read_at(SnapshotStream::Memory, 0, 4096)?;
49/// # Ok(())
50/// # }
51/// ```
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53#[repr(u8)]
54pub enum SnapshotStream {
55    /// Persistent disk/storage stream
56    Disk = 0,
57    /// Volatile memory stream
58    Memory = 1,
59}
60
61/// Read-only interface for accessing Hexz snapshot data.
62///
63/// `File` is the primary API for reading compressed, block-indexed snapshots.
64/// It handles:
65/// - Block-level decompression with LRU caching
66/// - Optional AES-256-GCM decryption
67/// - Thin snapshot parent chaining
68/// - Dual-stream access (disk and memory)
69/// - Random access with minimal I/O
70///
71/// # Thread Safety
72///
73/// `File` is `Send + Sync` and can be safely shared across threads via `Arc`.
74/// Internal caches use `Mutex` for synchronization.
75///
76/// # Performance
77///
78/// - **Cache hit latency**: ~80μs (warm cache)
79/// - **Cache miss latency**: ~1ms (cold cache, local storage)
80/// - **Sequential throughput**: ~2-3 GB/s (NVMe + LZ4)
81/// - **Memory overhead**: ~150MB typical (configurable)
82///
83/// # Examples
84///
85/// ## Basic Usage
86///
87/// ```no_run
88/// use hexz_core::{File, SnapshotStream};
89/// use hexz_core::store::local::FileBackend;
90/// use hexz_core::algo::compression::lz4::Lz4Compressor;
91/// use std::sync::Arc;
92///
93/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
94/// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
95/// let compressor = Box::new(Lz4Compressor::new());
96/// let snapshot = File::new(backend, compressor, None)?;
97///
98/// // Read 4KB at offset 1MB
99/// let data = snapshot.read_at(SnapshotStream::Disk, 1024 * 1024, 4096)?;
100/// assert_eq!(data.len(), 4096);
101/// # Ok(())
102/// # }
103/// ```
104///
105/// ## Thin Snapshots (with parent)
106///
107/// ```no_run
108/// use hexz_core::File;
109/// use hexz_core::store::local::FileBackend;
110/// use hexz_core::algo::compression::lz4::Lz4Compressor;
111/// use std::sync::Arc;
112///
113/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
114/// // Open base snapshot
115/// let base_backend = Arc::new(FileBackend::new("base.hxz".as_ref())?);
116/// let base = File::new(
117///     base_backend,
118///     Box::new(Lz4Compressor::new()),
119///     None
120/// )?;
121///
122/// // The thin snapshot will automatically load its parent based on
123/// // the parent_path field in the header
124/// let thin_backend = Arc::new(FileBackend::new("incremental.hxz".as_ref())?);
125/// let thin = File::new(
126///     thin_backend,
127///     Box::new(Lz4Compressor::new()),
128///     None
129/// )?;
130///
131/// // Reads automatically fall back to base for unchanged blocks
132/// let data = thin.read_at(hexz_core::SnapshotStream::Disk, 0, 4096)?;
133/// # Ok(())
134/// # }
135/// ```
136pub struct File {
137    /// Snapshot metadata (sizes, compression, encryption settings)
138    pub header: Header,
139
140    /// Master index containing top-level page entries
141    master: MasterIndex,
142
143    /// Storage backend for reading raw snapshot data
144    backend: Arc<dyn StorageBackend>,
145
146    /// Compression algorithm (LZ4 or Zstandard)
147    compressor: Box<dyn Compressor>,
148
149    /// Optional encryption (AES-256-GCM)
150    encryptor: Option<Box<dyn Encryptor>>,
151
152    /// Optional parent snapshot for thin (incremental) snapshots.
153    /// When a block's offset is BLOCK_OFFSET_PARENT, data is fetched from parent.
154    parent: Option<Arc<File>>,
155
156    /// LRU cache for decompressed blocks (per-stream, per-block-index)
157    cache_l1: BlockCache,
158
159    /// LRU cache for deserialized index pages
160    page_cache: Mutex<LruCache<u64, Arc<IndexPage>>>,
161
162    /// Optional prefetcher for background data loading
163    prefetcher: Option<Prefetcher>,
164}
165
166impl File {
167    /// Opens a Hexz snapshot with default cache settings.
168    ///
169    /// This is the primary constructor for `File`. It:
170    /// 1. Reads and validates the snapshot header (magic bytes, version)
171    /// 2. Deserializes the master index
172    /// 3. Recursively loads parent snapshots (for thin snapshots)
173    /// 4. Initializes block and page caches
174    ///
175    /// # Parameters
176    ///
177    /// - `backend`: Storage backend (local file, HTTP, S3, etc.)
178    /// - `compressor`: Compression algorithm matching the snapshot format
179    /// - `encryptor`: Optional decryption handler (pass `None` for unencrypted snapshots)
180    ///
181    /// # Returns
182    ///
183    /// - `Ok(File)` on success
184    /// - `Err(Error::Format)` if magic bytes or version are invalid
185    /// - `Err(Error::Io)` if storage backend fails
186    ///
187    /// # Examples
188    ///
189    /// ```no_run
190    /// use hexz_core::{File, SnapshotStream};
191    /// use hexz_core::store::local::FileBackend;
192    /// use hexz_core::algo::compression::lz4::Lz4Compressor;
193    /// use std::sync::Arc;
194    ///
195    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
196    /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
197    /// let compressor = Box::new(Lz4Compressor::new());
198    /// let snapshot = File::new(backend, compressor, None)?;
199    ///
200    /// println!("Disk size: {} bytes", snapshot.size(SnapshotStream::Disk));
201    /// # Ok(())
202    /// # }
203    /// ```
204    /// Opens a snapshot, auto-detecting compression and dictionary from the header.
205    ///
206    /// This eliminates the 3-step boilerplate of: read header, load dict, create
207    /// compressor. Equivalent to `File::new(backend, auto_compressor, encryptor)`.
208    pub fn open(
209        backend: Arc<dyn StorageBackend>,
210        encryptor: Option<Box<dyn Encryptor>>,
211    ) -> Result<Arc<Self>> {
212        Self::open_with_cache(backend, encryptor, None, None)
213    }
214
215    /// Like [`open`](Self::open) but with custom cache and prefetch settings.
216    pub fn open_with_cache(
217        backend: Arc<dyn StorageBackend>,
218        encryptor: Option<Box<dyn Encryptor>>,
219        cache_capacity_bytes: Option<usize>,
220        prefetch_window_size: Option<u32>,
221    ) -> Result<Arc<Self>> {
222        let header = Header::read_from_backend(backend.as_ref())?;
223        let dictionary = header.load_dictionary(backend.as_ref())?;
224        let compressor = create_compressor(header.compression, None, dictionary);
225        Self::with_cache(
226            backend,
227            compressor,
228            encryptor,
229            cache_capacity_bytes,
230            prefetch_window_size,
231        )
232    }
233
234    pub fn new(
235        backend: Arc<dyn StorageBackend>,
236        compressor: Box<dyn Compressor>,
237        encryptor: Option<Box<dyn Encryptor>>,
238    ) -> Result<Arc<Self>> {
239        Self::with_cache(backend, compressor, encryptor, None, None)
240    }
241
242    /// Opens a Hexz snapshot with custom cache capacity and prefetching.
243    ///
244    /// Identical to [`new`](Self::new) but allows specifying cache size and prefetch window.
245    ///
246    /// # Parameters
247    ///
248    /// - `backend`: Storage backend
249    /// - `compressor`: Compression algorithm
250    /// - `encryptor`: Optional decryption handler
251    /// - `cache_capacity_bytes`: Block cache size in bytes (default: ~400MB for 4KB blocks)
252    /// - `prefetch_window_size`: Number of blocks to prefetch ahead (default: disabled)
253    ///
254    /// # Cache Sizing
255    ///
256    /// The cache stores decompressed blocks. Given a block size of 4KB:
257    /// - `Some(100_000_000)` → ~24,000 blocks (~96MB effective)
258    /// - `None` → 1000 blocks (~4MB effective)
259    ///
260    /// Larger caches reduce repeated decompression but increase memory usage.
261    ///
262    /// # Prefetching
263    ///
264    /// When `prefetch_window_size` is set, the system will automatically fetch the next N blocks
265    /// in the background after each read, optimizing sequential access patterns:
266    /// - `Some(4)` → Prefetch 4 blocks ahead
267    /// - `None` or `Some(0)` → Disable prefetching
268    ///
269    /// # Examples
270    ///
271    /// ```no_run
272    /// use hexz_core::File;
273    /// use hexz_core::store::local::FileBackend;
274    /// use hexz_core::algo::compression::lz4::Lz4Compressor;
275    /// use std::sync::Arc;
276    ///
277    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
278    /// let backend = Arc::new(FileBackend::new("snapshot.hxz".as_ref())?);
279    /// let compressor = Box::new(Lz4Compressor::new());
280    ///
281    /// // Allocate 256MB for cache, prefetch 4 blocks ahead
282    /// let snapshot = File::with_cache(
283    ///     backend,
284    ///     compressor,
285    ///     None,
286    ///     Some(256 * 1024 * 1024),
287    ///     Some(4)
288    /// )?;
289    /// # Ok(())
290    /// # }
291    /// ```
292    pub fn with_cache(
293        backend: Arc<dyn StorageBackend>,
294        compressor: Box<dyn Compressor>,
295        encryptor: Option<Box<dyn Encryptor>>,
296        cache_capacity_bytes: Option<usize>,
297        prefetch_window_size: Option<u32>,
298    ) -> Result<Arc<Self>> {
299        let header_bytes = backend.read_exact(0, HEADER_SIZE)?;
300        let header: Header = bincode::deserialize(&header_bytes)?;
301
302        if &header.magic != MAGIC_BYTES {
303            return Err(Error::Format("Invalid magic bytes".into()));
304        }
305
306        // Check version compatibility
307        let compatibility = check_version(header.version);
308        match compatibility {
309            VersionCompatibility::Full => {
310                // Perfect match, proceed silently
311            }
312            VersionCompatibility::Degraded => {
313                // Newer version, issue warning but allow
314                tracing::warn!("{}", compatibility_message(header.version));
315            }
316            VersionCompatibility::Incompatible => {
317                // Too old or too new, reject
318                return Err(Error::Format(compatibility_message(header.version)));
319            }
320        }
321
322        let index_bytes = backend.read_exact(
323            header.index_offset,
324            (backend.len() - header.index_offset) as usize,
325        )?;
326
327        let master: MasterIndex = bincode::deserialize(&index_bytes)?;
328
329        // Recursively load parent if present
330        let parent = if let Some(parent_path) = &header.parent_path {
331            tracing::info!("Loading parent snapshot: {}", parent_path);
332            let p_backend = Arc::new(FileBackend::new(Path::new(parent_path))?);
333            Some(File::open(p_backend, None)?)
334        } else {
335            None
336        };
337
338        let block_size = header.block_size as usize;
339        let l1_capacity = if let Some(bytes) = cache_capacity_bytes {
340            (bytes / block_size).max(1)
341        } else {
342            1000
343        };
344
345        // Initialize prefetcher if window size is specified and > 0
346        let prefetcher = prefetch_window_size.filter(|&w| w > 0).map(Prefetcher::new);
347
348        Ok(Arc::new(Self {
349            header,
350            master,
351            backend,
352            compressor,
353            encryptor,
354            parent,
355            cache_l1: BlockCache::with_capacity(l1_capacity),
356            page_cache: Mutex::new(LruCache::new(
357                NonZeroUsize::new(128).unwrap_or(NonZeroUsize::MIN),
358            )),
359            prefetcher,
360        }))
361    }
362
363    /// Returns the logical size of a stream in bytes.
364    ///
365    /// # Parameters
366    ///
367    /// - `stream`: The stream to query (Disk or Memory)
368    ///
369    /// # Returns
370    ///
371    /// The uncompressed, logical size of the stream. This is the size you would
372    /// get if you decompressed all blocks and concatenated them.
373    ///
374    /// # Examples
375    ///
376    /// ```no_run
377    /// use hexz_core::{File, SnapshotStream};
378    /// # use std::sync::Arc;
379    /// # fn example(snapshot: Arc<File>) {
380    /// let disk_bytes = snapshot.size(SnapshotStream::Disk);
381    /// let mem_bytes = snapshot.size(SnapshotStream::Memory);
382    ///
383    /// println!("Disk: {} GB", disk_bytes / (1024 * 1024 * 1024));
384    /// println!("Memory: {} MB", mem_bytes / (1024 * 1024));
385    /// # }
386    /// ```
387    pub fn size(&self, stream: SnapshotStream) -> u64 {
388        match stream {
389            SnapshotStream::Disk => self.master.disk_size,
390            SnapshotStream::Memory => self.master.memory_size,
391        }
392    }
393
394    /// Reads data from a snapshot stream at a given offset.
395    ///
396    /// This is the primary read method for random access. It:
397    /// 1. Identifies which blocks overlap the requested range
398    /// 2. Fetches blocks from cache or decompresses from storage
399    /// 3. Handles thin snapshot fallback to parent
400    /// 4. Assembles the final buffer from block slices
401    ///
402    /// # Parameters
403    ///
404    /// - `stream`: Which stream to read from (Disk or Memory)
405    /// - `offset`: Starting byte offset (0-indexed)
406    /// - `len`: Number of bytes to read
407    ///
408    /// # Returns
409    ///
410    /// A `Vec<u8>` containing up to `len` bytes. The returned vector may be shorter
411    /// if:
412    /// - `offset` is beyond the stream size (returns empty vector)
413    /// - `offset + len` exceeds stream size (returns partial data)
414    ///
415    /// Missing data (sparse regions) is zero-filled.
416    ///
417    /// # Errors
418    ///
419    /// - `Error::Io` if backend read fails (e.g. truncated file)
420    /// - `Error::Corruption(block_idx)` if block checksum does not match
421    /// - `Error::Decompression` if block decompression fails
422    /// - `Error::Decryption` if block decryption fails
423    ///
424    /// # Performance
425    ///
426    /// - **Cache hit**: ~80μs latency, no I/O
427    /// - **Cache miss**: ~1ms latency (local storage), includes decompression
428    /// - **Remote storage**: Latency depends on network (HTTP: ~50ms, S3: ~100ms)
429    ///
430    /// Aligned reads (offset % block_size == 0) are most efficient.
431    ///
432    /// # Examples
433    ///
434    /// ```no_run
435    /// use hexz_core::{File, SnapshotStream};
436    /// # use std::sync::Arc;
437    /// # fn example(snapshot: Arc<File>) -> Result<(), Box<dyn std::error::Error>> {
438    /// // Read first 512 bytes of disk stream
439    /// let boot_sector = snapshot.read_at(SnapshotStream::Disk, 0, 512)?;
440    ///
441    /// // Read from arbitrary offset
442    /// let chunk = snapshot.read_at(SnapshotStream::Disk, 1024 * 1024, 4096)?;
443    ///
444    /// // Reading beyond stream size returns empty vector
445    /// let empty = snapshot.read_at(SnapshotStream::Disk, u64::MAX, 100)?;
446    /// assert!(empty.is_empty());
447    /// # Ok(())
448    /// # }
449    /// ```
450    /// Reads a byte range. Uses parallel block decompression when the range spans multiple blocks.
451    pub fn read_at(
452        self: &Arc<Self>,
453        stream: SnapshotStream,
454        offset: u64,
455        len: usize,
456    ) -> Result<Vec<u8>> {
457        let stream_size = self.size(stream);
458        if offset >= stream_size {
459            return Ok(Vec::new());
460        }
461        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
462        if actual_len == 0 {
463            return Ok(Vec::new());
464        }
465
466        let pages = match stream {
467            SnapshotStream::Disk => &self.master.disk_pages,
468            SnapshotStream::Memory => &self.master.memory_pages,
469        };
470
471        if pages.is_empty() {
472            if let Some(parent) = &self.parent {
473                return parent.read_at(stream, offset, actual_len);
474            }
475            return Ok(vec![0u8; actual_len]);
476        }
477
478        let mut buf: Vec<MaybeUninit<u8>> = Vec::new();
479        buf.resize_with(actual_len, MaybeUninit::uninit);
480        self.read_at_into_uninit(stream, offset, &mut buf)?;
481        let ptr = buf.as_mut_ptr().cast::<u8>();
482        let len = buf.len();
483        let cap = buf.capacity();
484        std::mem::forget(buf);
485        // SAFETY: `buf` was a Vec<MaybeUninit<u8>> that we fully initialized via
486        // `read_at_into_uninit` (which writes every byte). We `forget` the original
487        // Vec to avoid a double-free and reconstruct it with the same ptr/len/cap.
488        // MaybeUninit<u8> has the same layout as u8.
489        Ok(unsafe { Vec::from_raw_parts(ptr, len, cap) })
490    }
491
492    /// Reads into a provided buffer. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
493    pub fn read_at_into(
494        self: &Arc<Self>,
495        stream: SnapshotStream,
496        offset: u64,
497        buffer: &mut [u8],
498    ) -> Result<()> {
499        let len = buffer.len();
500        if len == 0 {
501            return Ok(());
502        }
503        let stream_size = self.size(stream);
504        if offset >= stream_size {
505            buffer.fill(0);
506            return Ok(());
507        }
508        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
509        if actual_len < len {
510            buffer[actual_len..].fill(0);
511        }
512        self.read_at_into_uninit_bytes(stream, offset, &mut buffer[0..actual_len])
513    }
514
515    /// Minimum number of local blocks to use the parallel decompression path.
516    const PARALLEL_MIN_BLOCKS: usize = 2;
517
518    /// Collects work items for blocks that need decompression.
519    ///
520    /// This method iterates through index pages and blocks, handling:
521    /// - Parent blocks: delegate to parent snapshot or zero-fill
522    /// - Zero blocks: zero-fill directly
523    /// - Regular blocks: add to work queue for later decompression
524    ///
525    /// Returns the work items to process and updates the tracking variables.
526    fn collect_work_items(
527        &self,
528        stream: SnapshotStream,
529        pages: &[PageEntry],
530        page_idx: usize,
531        target: &mut [MaybeUninit<u8>],
532        offset: u64,
533        actual_len: usize,
534    ) -> Result<(Vec<WorkItem>, usize)> {
535        let mut local_work: Vec<WorkItem> = Vec::new();
536        let mut buf_offset = 0;
537        let mut current_pos = offset;
538        let mut remaining = actual_len;
539
540        for page_entry in pages.iter().skip(page_idx) {
541            if remaining == 0 {
542                break;
543            }
544            if page_entry.start_logical > current_pos + remaining as u64 {
545                break;
546            }
547
548            let page = self.get_page(page_entry)?;
549            let mut block_logical_start = page_entry.start_logical;
550
551            for (block_idx_in_page, block) in page.blocks.iter().enumerate() {
552                let block_end = block_logical_start + block.logical_len as u64;
553
554                if block_end > current_pos {
555                    let global_block_idx = page_entry.start_block + block_idx_in_page as u64;
556                    let offset_in_block = (current_pos - block_logical_start) as usize;
557                    let to_copy = std::cmp::min(
558                        remaining,
559                        (block.logical_len as usize).saturating_sub(offset_in_block),
560                    );
561
562                    if block.offset == BLOCK_OFFSET_PARENT {
563                        // Parent block: delegate or zero-fill
564                        if let Some(parent) = &self.parent {
565                            let dest = &mut target[buf_offset..buf_offset + to_copy];
566                            parent.read_at_into_uninit(stream, current_pos, dest)?;
567                        } else {
568                            Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
569                        }
570                        current_pos += to_copy as u64;
571                        buf_offset += to_copy;
572                        remaining -= to_copy;
573                    } else if block.length == 0 {
574                        // Zero block: fill with zeros
575                        Self::zero_fill_uninit(&mut target[buf_offset..buf_offset + to_copy]);
576                        current_pos += to_copy as u64;
577                        buf_offset += to_copy;
578                        remaining -= to_copy;
579                    } else {
580                        // Regular block: add to work queue
581                        if to_copy > 0 {
582                            local_work.push((
583                                global_block_idx,
584                                *block,
585                                buf_offset,
586                                offset_in_block,
587                                to_copy,
588                            ));
589                            buf_offset += to_copy;
590                            current_pos += to_copy as u64;
591                            remaining -= to_copy;
592                        }
593                    }
594
595                    if remaining == 0 {
596                        break;
597                    }
598                }
599                block_logical_start += block.logical_len as u64;
600            }
601        }
602
603        Ok((local_work, buf_offset))
604    }
605
606    /// Executes parallel decompression for multiple blocks.
607    ///
608    /// Uses a two-phase approach:
609    /// 1. Parallel I/O: Fetch all raw blocks concurrently
610    /// 2. Parallel CPU: Decompress and copy to target buffer
611    fn execute_parallel_decompression(
612        self: &Arc<Self>,
613        stream: SnapshotStream,
614        work_items: &[WorkItem],
615        target: &mut [MaybeUninit<u8>],
616        actual_len: usize,
617    ) -> Result<()> {
618        let snap = Arc::clone(self);
619        let target_addr = target.as_mut_ptr() as usize;
620
621        // Phase 1: Parallel fetch all raw blocks
622        let raw_blocks: Vec<Result<Bytes>> = work_items
623            .par_iter()
624            .map(|(block_idx, info, _, _, _)| snap.fetch_raw_block(stream, *block_idx, info))
625            .collect();
626
627        // Phase 2: Parallel decompress and copy
628        let err: Mutex<Option<Error>> = Mutex::new(None);
629        work_items
630            .par_iter()
631            .zip(raw_blocks)
632            .for_each(|(work_item, raw_result)| {
633                if err.lock().map_or(true, |e| e.is_some()) {
634                    return;
635                }
636
637                let (block_idx, info, buf_offset, offset_in_block, to_copy) = work_item;
638
639                // Handle fetch errors
640                let raw = match raw_result {
641                    Ok(r) => r,
642                    Err(e) => {
643                        if let Ok(mut guard) = err.lock() {
644                            let _ = guard.replace(e);
645                        }
646                        return;
647                    }
648                };
649
650                // If cache hit or zero block, raw is already decompressed
651                let data = if info.length == 0 || snap.cache_l1.get(stream, *block_idx).is_some() {
652                    raw
653                } else {
654                    // Decompress and verify
655                    match snap.decompress_and_verify(raw, *block_idx, info) {
656                        Ok(d) => {
657                            // Cache the result
658                            snap.cache_l1.insert(stream, *block_idx, d.clone());
659                            d
660                        }
661                        Err(e) => {
662                            if let Ok(mut guard) = err.lock() {
663                                let _ = guard.replace(e);
664                            }
665                            return;
666                        }
667                    }
668                };
669
670                // Copy to target buffer
671                let src = data.as_ref();
672                let start = *offset_in_block;
673                let len = *to_copy;
674                if start < src.len() && len <= src.len() - start {
675                    // Defensive assertion: ensure destination write is within bounds
676                    debug_assert!(
677                        buf_offset + len <= actual_len,
678                        "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
679                        len,
680                        buf_offset,
681                        actual_len
682                    );
683                    let dest = (target_addr + buf_offset) as *mut u8;
684                    // SAFETY: `src[start..start+len]` is in-bounds (checked above).
685                    // `dest` points into the `target` MaybeUninit buffer at a unique
686                    // non-overlapping offset (each work item has a distinct `buf_offset`),
687                    // and the rayon par_iter ensures each item writes to a disjoint region.
688                    // The debug_assert above validates buf_offset + len <= actual_len.
689                    unsafe { ptr::copy_nonoverlapping(src[start..].as_ptr(), dest, len) };
690                }
691            });
692
693        if let Some(e) = err.lock().ok().and_then(|mut guard| guard.take()) {
694            return Err(e);
695        }
696
697        Ok(())
698    }
699
700    /// Executes serial decompression for a small number of blocks.
701    fn execute_serial_decompression(
702        &self,
703        stream: SnapshotStream,
704        work_items: &[WorkItem],
705        target: &mut [MaybeUninit<u8>],
706        actual_len: usize,
707    ) -> Result<()> {
708        for (block_idx, info, buf_offset, offset_in_block, to_copy) in work_items {
709            let data = self.resolve_block_data(stream, *block_idx, info)?;
710            let src = data.as_ref();
711            let start = *offset_in_block;
712            if start < src.len() && *to_copy <= src.len() - start {
713                // Defensive assertion: ensure destination write is within bounds
714                debug_assert!(
715                    *buf_offset + *to_copy <= actual_len,
716                    "Buffer overflow: attempting to write {} bytes at offset {} into buffer of length {}",
717                    to_copy,
718                    buf_offset,
719                    actual_len
720                );
721                // SAFETY: `src[start..start+to_copy]` is in-bounds (checked above).
722                // `target[buf_offset..]` has sufficient room because `buf_offset + to_copy`
723                // never exceeds `actual_len` (tracked during work-item collection).
724                // The debug_assert above validates this invariant.
725                // MaybeUninit<u8> has the same layout as u8.
726                unsafe {
727                    ptr::copy_nonoverlapping(
728                        src[start..].as_ptr(),
729                        target[*buf_offset..].as_mut_ptr() as *mut u8,
730                        *to_copy,
731                    );
732                }
733            }
734        }
735        Ok(())
736    }
737
738    /// Zero-fills a slice of uninitialized memory.
739    ///
740    /// This helper centralizes all unsafe zero-filling operations to improve
741    /// safety auditing and reduce code duplication.
742    ///
743    /// # Safety
744    ///
745    /// This function writes zeros to the provided buffer, making it fully initialized.
746    /// The caller must ensure the buffer is valid for writes.
747    #[inline]
748    fn zero_fill_uninit(buffer: &mut [MaybeUninit<u8>]) {
749        if !buffer.is_empty() {
750            // SAFETY: buffer is a valid &mut [MaybeUninit<u8>] slice, so writing
751            // buffer.len() zero bytes through its pointer is in-bounds.
752            unsafe { ptr::write_bytes(buffer.as_mut_ptr(), 0, buffer.len()) };
753        }
754    }
755
756    /// Writes into uninitialized memory. Unused suffix is zero-filled. Uses parallel decompression when spanning multiple blocks.
757    ///
758    /// **On error:** The buffer contents are undefined (possibly partially written).
759    pub fn read_at_into_uninit(
760        self: &Arc<Self>,
761        stream: SnapshotStream,
762        offset: u64,
763        buffer: &mut [MaybeUninit<u8>],
764    ) -> Result<()> {
765        self.read_at_uninit_inner(stream, offset, buffer, false)
766    }
767
768    /// Inner implementation of [`read_at_into_uninit`](Self::read_at_into_uninit).
769    ///
770    /// The `is_prefetch` flag prevents recursive prefetch thread spawning:
771    /// when `true`, the prefetch block is skipped to avoid unbounded thread creation.
772    fn read_at_uninit_inner(
773        self: &Arc<Self>,
774        stream: SnapshotStream,
775        offset: u64,
776        buffer: &mut [MaybeUninit<u8>],
777        is_prefetch: bool,
778    ) -> Result<()> {
779        // Early validation
780        let len = buffer.len();
781        if len == 0 {
782            return Ok(());
783        }
784
785        let stream_size = self.size(stream);
786        if offset >= stream_size {
787            Self::zero_fill_uninit(buffer);
788            return Ok(());
789        }
790
791        // Calculate actual read length and zero-fill suffix if needed
792        let actual_len = std::cmp::min(len as u64, stream_size - offset) as usize;
793        if actual_len < len {
794            Self::zero_fill_uninit(&mut buffer[actual_len..]);
795        }
796
797        let target = &mut buffer[0..actual_len];
798
799        // Get page list for stream
800        let pages = match stream {
801            SnapshotStream::Disk => &self.master.disk_pages,
802            SnapshotStream::Memory => &self.master.memory_pages,
803        };
804
805        // Delegate to parent if no index pages
806        if pages.is_empty() {
807            if let Some(parent) = &self.parent {
808                return parent.read_at_into_uninit(stream, offset, target);
809            }
810            Self::zero_fill_uninit(target);
811            return Ok(());
812        }
813
814        // Find starting page index
815        let page_idx = match pages.binary_search_by(|p| p.start_logical.cmp(&offset)) {
816            Ok(idx) => idx,
817            Err(idx) => idx.saturating_sub(1),
818        };
819
820        // Collect work items (handles parent blocks, zero blocks, and queues regular blocks)
821        let (work_items, buf_offset) =
822            self.collect_work_items(stream, pages, page_idx, target, offset, actual_len)?;
823
824        // Choose parallel or serial decompression based on work item count
825        if work_items.len() >= Self::PARALLEL_MIN_BLOCKS {
826            self.execute_parallel_decompression(stream, &work_items, target, actual_len)?;
827        } else {
828            self.execute_serial_decompression(stream, &work_items, target, actual_len)?;
829        }
830
831        // Handle any remaining unprocessed data
832        let remaining = actual_len - buf_offset;
833        if remaining > 0 {
834            if let Some(parent) = &self.parent {
835                let current_pos = offset + buf_offset as u64;
836                parent.read_at_into_uninit(stream, current_pos, &mut target[buf_offset..])?;
837            } else {
838                Self::zero_fill_uninit(&mut target[buf_offset..]);
839            }
840        }
841
842        // Trigger prefetch for next sequential blocks if enabled.
843        // Guards:
844        // 1. `is_prefetch` prevents recursive spawning (prefetch thread spawning another)
845        // 2. `try_start()` limits to one in-flight prefetch at a time, preventing
846        //    unbounded thread creation under rapid sequential reads
847        if let Some(prefetcher) = &self.prefetcher {
848            if !is_prefetch && !work_items.is_empty() && prefetcher.try_start() {
849                let next_offset = offset + actual_len as u64;
850                let prefetch_len = (self.header.block_size * 4) as usize;
851                let snap = Arc::clone(self);
852                let stream_copy = stream;
853                std::thread::spawn(move || {
854                    let mut buf = vec![MaybeUninit::uninit(); prefetch_len];
855                    let _ = snap.read_at_uninit_inner(stream_copy, next_offset, &mut buf, true);
856                    // Release the in-flight guard so the next read can prefetch
857                    if let Some(pf) = &snap.prefetcher {
858                        pf.clear_in_flight();
859                    }
860                });
861            }
862        }
863
864        Ok(())
865    }
866
867    /// Like [`read_at_into_uninit`](Self::read_at_into_uninit) but accepts `&mut [u8]`. Use from FFI (e.g. Python).
868    #[inline]
869    pub fn read_at_into_uninit_bytes(
870        self: &Arc<Self>,
871        stream: SnapshotStream,
872        offset: u64,
873        buf: &mut [u8],
874    ) -> Result<()> {
875        if buf.is_empty() {
876            return Ok(());
877        }
878        // SAFETY: &mut [u8] and &mut [MaybeUninit<u8>] have identical layout (both
879        // are slices of single-byte types). Initialized u8 values are valid MaybeUninit<u8>.
880        // The borrow is derived from `buf` so no aliasing occurs.
881        let uninit = unsafe { &mut *(buf as *mut [u8] as *mut [MaybeUninit<u8>]) };
882        self.read_at_into_uninit(stream, offset, uninit)
883    }
884
885    /// Fetches an index page from cache or storage.
886    ///
887    /// Index pages map logical offsets to physical block locations. This method
888    /// maintains an LRU cache to avoid repeated deserialization.
889    ///
890    /// # Parameters
891    ///
892    /// - `entry`: Page metadata from master index
893    ///
894    /// # Returns
895    ///
896    /// A shared reference to the deserialized index page.
897    ///
898    /// # Thread Safety
899    ///
900    /// This method acquires a lock on the page cache only for cache lookup and insertion.
901    /// I/O and deserialization are performed without holding the lock to avoid blocking
902    /// other threads during cache misses.
903    fn get_page(&self, entry: &PageEntry) -> Result<Arc<IndexPage>> {
904        // Fast path: check cache with lock held
905        {
906            let mut cache = self
907                .page_cache
908                .lock()
909                .map_err(|_| Error::Io(std::io::Error::other("Page cache lock poisoned")))?;
910            if let Some(p) = cache.get(&entry.offset) {
911                return Ok(p.clone());
912            }
913        }
914
915        // Slow path: release lock before I/O and deserialization
916        let bytes = self
917            .backend
918            .read_exact(entry.offset, entry.length as usize)?;
919        let page: IndexPage = bincode::deserialize(&bytes)?;
920        let arc = Arc::new(page);
921
922        // Re-acquire lock only for insertion
923        {
924            let mut cache = self
925                .page_cache
926                .lock()
927                .map_err(|_| Error::Io(std::io::Error::other("Page cache lock poisoned")))?;
928            // Check again in case another thread inserted while we were doing I/O
929            if let Some(p) = cache.get(&entry.offset) {
930                return Ok(p.clone());
931            }
932            cache.put(entry.offset, arc.clone());
933        }
934
935        Ok(arc)
936    }
937
938    /// Fetches raw compressed block data from cache or storage.
939    ///
940    /// This is the I/O portion of block resolution, separated to enable parallel I/O.
941    /// It:
942    /// 1. Checks the block cache
943    /// 2. Handles zero-length blocks
944    /// 3. Reads raw compressed data from backend
945    ///
946    /// # Parameters
947    ///
948    /// - `stream`: Stream identifier (for cache key)
949    /// - `block_idx`: Global block index
950    /// - `info`: Block metadata (offset, length)
951    ///
952    /// # Returns
953    ///
954    /// Raw block data (potentially compressed/encrypted) or cached decompressed data.
955    fn fetch_raw_block(
956        &self,
957        stream: SnapshotStream,
958        block_idx: u64,
959        info: &BlockInfo,
960    ) -> Result<Bytes> {
961        // Check cache first - return decompressed data if available
962        if let Some(data) = self.cache_l1.get(stream, block_idx) {
963            return Ok(data);
964        }
965
966        // Handle zero blocks
967        if info.length == 0 {
968            let len = info.logical_len as usize;
969            if len == 0 {
970                return Ok(Bytes::new());
971            }
972            if len == ZEROS_64K.len() {
973                return Ok(Bytes::from_static(&ZEROS_64K));
974            }
975            return Ok(Bytes::from(vec![0u8; len]));
976        }
977
978        // Fetch raw compressed data (THIS IS THE PARALLEL PART)
979        self.backend.read_exact(info.offset, info.length as usize)
980    }
981
982    /// Decompresses and verifies a raw block.
983    ///
984    /// This is the CPU portion of block resolution, separated to enable parallel decompression.
985    /// It:
986    /// 1. Verifies CRC32 checksum
987    /// 2. Decrypts (if encrypted)
988    /// 3. Decompresses
989    ///
990    /// # Parameters
991    ///
992    /// - `raw`: Raw block data (potentially compressed/encrypted)
993    /// - `block_idx`: Global block index (for error reporting and decryption)
994    /// - `info`: Block metadata (checksum)
995    ///
996    /// # Returns
997    ///
998    /// Decompressed block data as `Bytes`.
999    ///
1000    /// # Performance
1001    ///
1002    /// Decompression throughput:
1003    /// - LZ4: ~2 GB/s per core
1004    /// - Zstd: ~500 MB/s per core
1005    fn decompress_and_verify(&self, raw: Bytes, block_idx: u64, info: &BlockInfo) -> Result<Bytes> {
1006        // Verify stored checksum (CRC32 of compressed/encrypted data) before decrypt/decompress
1007        if info.checksum != 0 {
1008            let computed = crc32_hash(&raw);
1009            if computed != info.checksum {
1010                return Err(Error::Corruption(block_idx));
1011            }
1012        }
1013
1014        // Decrypt and decompress
1015        let decompressed = if let Some(enc) = &self.encryptor {
1016            let compressed = enc.decrypt(&raw, block_idx)?;
1017            self.compressor.decompress(&compressed)?
1018        } else {
1019            self.compressor.decompress(raw.as_ref())?
1020        };
1021
1022        Ok(Bytes::from(decompressed))
1023    }
1024
1025    /// Resolves raw block data by fetching from cache or decompressing from storage.
1026    ///
1027    /// This is the core decompression path. It:
1028    /// 1. Checks the block cache
1029    /// 2. Reads compressed block from backend
1030    /// 3. Verifies CRC32 checksum (if stored) and returns `Corruption(block_idx)` on mismatch
1031    /// 4. Decrypts (if encrypted)
1032    /// 5. Decompresses
1033    /// 6. Caches the result
1034    ///
1035    /// # Parameters
1036    ///
1037    /// - `stream`: Stream identifier (for cache key)
1038    /// - `block_idx`: Global block index
1039    /// - `info`: Block metadata (offset, length, compression)
1040    ///
1041    /// # Returns
1042    ///
1043    /// Decompressed block data as `Bytes` (zero-copy on cache hit).
1044    ///
1045    /// # Performance
1046    ///
1047    /// This method is hot path for cache misses. Decompression throughput:
1048    /// - LZ4: ~2 GB/s per core
1049    /// - Zstd: ~500 MB/s per core
1050    fn resolve_block_data(
1051        &self,
1052        stream: SnapshotStream,
1053        block_idx: u64,
1054        info: &BlockInfo,
1055    ) -> Result<Bytes> {
1056        // Fetch raw block (from cache or I/O)
1057        let raw = self.fetch_raw_block(stream, block_idx, info)?;
1058
1059        // If cache hit or zero block, raw is already decompressed
1060        if info.length == 0 || self.cache_l1.get(stream, block_idx).is_some() {
1061            return Ok(raw);
1062        }
1063
1064        // Decompress and verify
1065        let data = self.decompress_and_verify(raw, block_idx, info)?;
1066
1067        // Cache the result
1068        self.cache_l1.insert(stream, block_idx, data.clone());
1069        Ok(data)
1070    }
1071}